From 0cfeadadf47f2df9aa6cb9ab05fc6d4e0e81c470 Mon Sep 17 00:00:00 2001 From: Georgios Konstantopoulos Date: Sun, 21 Jun 2020 10:17:11 +0300 Subject: [PATCH] Websockets + TLS for Async-Std / Tokio (#30) * fix: relax trait bounds on JsonRpcClient * refactor(provider): move http client to separate dir * feat(provider): add initial Websocket support over Stream/Sink + Tungstenite * test(provider): add websocket test * feat(provider): add convenience method using tokio/async-std behind a feature flag * test(provider): add websocket ssl test * feat(provider): add TLS websockets for tokio/async-std * docs(provider): add websocket docs / examples * fix(provider): make tokio an optional dep --- Cargo.lock | 391 ++++++++++++++++++ ethers-contract/src/contract.rs | 10 +- ethers-providers/Cargo.toml | 40 +- ethers-providers/src/lib.rs | 55 ++- ethers-providers/src/provider.rs | 3 +- ethers-providers/src/stream.rs | 39 +- ethers-providers/src/transports/common.rs | 85 ++++ ethers-providers/src/{ => transports}/http.rs | 91 +--- ethers-providers/src/transports/mod.rs | 7 + ethers-providers/src/transports/ws.rs | 194 +++++++++ ethers-providers/tests/provider.rs | 77 +++- ethers-signers/src/client.rs | 10 +- ethers/Cargo.toml | 1 + ethers/examples/watch_blocks.rs | 4 +- 14 files changed, 877 insertions(+), 130 deletions(-) create mode 100644 ethers-providers/src/transports/common.rs rename ethers-providers/src/{ => transports}/http.rs (58%) create mode 100644 ethers-providers/src/transports/mod.rs create mode 100644 ethers-providers/src/transports/ws.rs diff --git a/Cargo.lock b/Cargo.lock index d328cd89..29219ecd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -37,6 +37,58 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cff77d8686867eceff3105329d4698d96c2391c176d5d03adc90c7389162b5b8" +[[package]] +name = "async-attributes" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "efd3d156917d94862e779f356c5acae312b08fd3121e792c857d7928c8088423" +dependencies = [ + "quote", + "syn", +] + +[[package]] +name = "async-std" +version = "1.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00d68a33ebc8b57800847d00787307f84a562224a14db069b0acefe4c2abbf5d" +dependencies = [ + "async-attributes", + "async-task", + "crossbeam-utils", + "futures-channel", + "futures-core", + "futures-io", + "kv-log-macro", + "log", + "memchr", + "num_cpus", + "once_cell", + "pin-project-lite", + "pin-utils", + "slab", + "smol", + "wasm-bindgen-futures", +] + +[[package]] +name = "async-task" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c17772156ef2829aadc587461c7753af20b7e8db1529bc66855add962a3b35d3" + +[[package]] +name = "async-tls" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95fd83426b89b034bf4e9ceb9c533c2f2386b813fd3dcae0a425ec6f1837d78a" +dependencies = [ + "futures", + "rustls", + "webpki", + "webpki-roots", +] + [[package]] name = "async-trait" version = "0.1.31" @@ -48,6 +100,24 @@ dependencies = [ "syn", ] +[[package]] +name = "async-tungstenite" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52ea20d9a2b068769745fbe27942952ea2b9a3967c52b5f0139637c7eca6f547" +dependencies = [ + "async-std", + "async-tls", + "futures-io", + "futures-util", + "log", + "native-tls", + "pin-project", + "tokio", + "tokio-native-tls", + "tungstenite", +] + [[package]] name = "autocfg" version = "1.0.0" @@ -113,6 +183,19 @@ dependencies = [ "byte-tools", ] +[[package]] +name = "blocking" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d17efb70ce4421e351d61aafd90c16a20fb5bfe339fcdc32a86816280e62ce0" +dependencies = [ + "futures-channel", + "futures-util", + "once_cell", + "parking", + "waker-fn", +] + [[package]] name = "bumpalo" version = "3.3.0" @@ -143,6 +226,12 @@ version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "130aac562c0dd69c56b3b1cc8ffd2e17be31d0b6c25b61c96b76231aa23e39e1" +[[package]] +name = "cache-padded" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24508e28c677875c380c20f4d28124fab6f8ed4ef929a1397d7b1a31e92f1005" + [[package]] name = "cc" version = "1.0.41" @@ -164,6 +253,42 @@ dependencies = [ "bitflags", ] +[[package]] +name = "concurrent-queue" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f83c06aff61f2d899eb87c379df3cbf7876f14471dcab474e0b6dc90ab96c080" +dependencies = [ + "cache-padded", +] + +[[package]] +name = "core-foundation" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57d24c7a13c43e870e37c1556b74555437870a04514f7685f5b354e090567171" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "core-foundation-sys" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3a71ab494c0b5b860bdc8407ae08978052417070c2ced38573a9157ad75b8ac" + +[[package]] +name = "crossbeam-utils" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" +dependencies = [ + "autocfg", + "cfg-if", + "lazy_static", +] + [[package]] name = "crunchy" version = "0.2.2" @@ -369,18 +494,24 @@ dependencies = [ name = "ethers-providers" version = "0.1.3" dependencies = [ + "async-std", + "async-tls", "async-trait", + "async-tungstenite", "ethers", "ethers-core", "futures-core", + "futures-timer", "futures-util", "pin-project", "reqwest", "rustc-hex", "serde", "serde_json", + "serial_test", "thiserror", "tokio", + "tokio-native-tls", "url", ] @@ -404,6 +535,12 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed" +[[package]] +name = "fastrand" +version = "1.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a64b0126b293b050395b37b10489951590ed024c03d7df4f249d219c8ded7cbf" + [[package]] name = "fixed-hash" version = "0.6.1" @@ -422,6 +559,21 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "fuchsia-zircon" version = "0.3.3" @@ -513,6 +665,12 @@ dependencies = [ "once_cell", ] +[[package]] +name = "futures-timer" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" + [[package]] name = "futures-util" version = "0.3.5" @@ -578,6 +736,15 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "hermit-abi" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9586eedd4ce6b3c498bc3b4dd92fc9f11166aa908a914071953768066c67909" +dependencies = [ + "libc", +] + [[package]] name = "hmac" version = "0.7.1" @@ -713,6 +880,15 @@ dependencies = [ "autocfg", ] +[[package]] +name = "input_buffer" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19a8a95243d5a0398cae618ec29477c6e3cb631152be5c19481f80bc71559754" +dependencies = [ + "bytes", +] + [[package]] name = "iovec" version = "0.1.4" @@ -747,6 +923,15 @@ dependencies = [ "winapi-build", ] +[[package]] +name = "kv-log-macro" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ff57d6d215f7ca7eb35a9a64d656ba4d9d2bef114d741dc08048e75e2f5d418" +dependencies = [ + "log", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -864,6 +1049,24 @@ dependencies = [ "ws2_32-sys", ] +[[package]] +name = "native-tls" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b0d88c06fe90d5ee94048ba40409ef1d9315d86f6f38c2efdaad4fb50c58b2d" +dependencies = [ + "lazy_static", + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "net2" version = "0.2.34" @@ -875,6 +1078,16 @@ dependencies = [ "winapi 0.3.8", ] +[[package]] +name = "num_cpus" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "once_cell" version = "1.4.0" @@ -887,6 +1100,20 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2839e79665f131bdb5782e51f2c6c9599c133c6098982a54c794358bf432529c" +[[package]] +name = "openssl" +version = "0.10.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cee6d85f4cb4c4f59a6a85d5b68a233d280c82e29e822913b9c8b129fbf20bdd" +dependencies = [ + "bitflags", + "cfg-if", + "foreign-types", + "lazy_static", + "libc", + "openssl-sys", +] + [[package]] name = "openssl-probe" version = "0.1.2" @@ -918,6 +1145,12 @@ dependencies = [ "serde", ] +[[package]] +name = "parking" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a7fad362df89617628a7508b3e9d588ade1b0ac31aa25de168193ad999c2dd4" + [[package]] name = "parking_lot" version = "0.10.2" @@ -980,6 +1213,18 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "piper" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01608bfa680dafb103f9207fa944facf572e4e3e708d10de19a0d0c3d36e5f18" +dependencies = [ + "crossbeam-utils", + "futures-io", + "futures-sink", + "futures-util", +] + [[package]] name = "pkg-config" version = "0.3.17" @@ -1106,6 +1351,15 @@ version = "0.6.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26412eb97c6b088a6997e05f69403a802a92d520de2f8e63c2b65f9e0f47c4e8" +[[package]] +name = "remove_dir_all" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7" +dependencies = [ + "winapi 0.3.8", +] + [[package]] name = "reqwest" version = "0.10.6" @@ -1201,6 +1455,12 @@ dependencies = [ "winapi 0.3.8", ] +[[package]] +name = "scoped-tls" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea6a9290e3c9cf0f18145ef7ffa62d68ee0bf5fcd651017e586dc7fd5da448c2" + [[package]] name = "scopeguard" version = "1.1.0" @@ -1217,6 +1477,29 @@ dependencies = [ "untrusted", ] +[[package]] +name = "security-framework" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64808902d7d99f78eaddd2b4e2509713babc3dc3c85ad6f4c447680f3c01e535" +dependencies = [ + "bitflags", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17bf11d99252f512695eb468de5516e5cf75455521e69dfe343f3b74e4748405" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "serde" version = "1.0.112" @@ -1282,6 +1565,18 @@ dependencies = [ "syn", ] +[[package]] +name = "sha-1" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7d94d0bede923b3cea61f3f1ff57ff8cdfd77b400fb8f9998949e0cf04163df" +dependencies = [ + "block-buffer", + "digest", + "fake-simd", + "opaque-debug", +] + [[package]] name = "sha2" version = "0.8.2" @@ -1306,6 +1601,27 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c7cb5678e1615754284ec264d9bb5b4c27d2018577fd90ac0ceb578591ed5ee4" +[[package]] +name = "smol" +version = "0.1.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c3546640e104d1ee544df9c08ac4896d44ef1de89a6d7cc4a3c22f677d5ef38" +dependencies = [ + "async-task", + "blocking", + "concurrent-queue", + "fastrand", + "futures-io", + "futures-util", + "libc", + "once_cell", + "piper", + "scoped-tls", + "slab", + "socket2", + "wepoll-binding", +] + [[package]] name = "socket2" version = "0.3.12" @@ -1353,6 +1669,20 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "tempfile" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a6e24d9338a0a5be79593e2fa15a648add6138caa803e2d5bc782c371732ca9" +dependencies = [ + "cfg-if", + "libc", + "rand", + "redox_syscall", + "remove_dir_all", + "winapi 0.3.8", +] + [[package]] name = "thiserror" version = "1.0.19" @@ -1439,6 +1769,16 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-native-tls" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd608593a919a8e05a7d1fc6df885e40f6a88d3a70a3a7eff23ff27964eda069" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.13.1" @@ -1477,6 +1817,26 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e604eb7b43c06650e854be16a2a03155743d3752dd1c943f6829e26b7a36e382" +[[package]] +name = "tungstenite" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5c7d464221cb0b538a1cd12f6d9127ed1e6bb7f3ffca98fb3cd4c6e3af8175c" +dependencies = [ + "base64 0.12.1", + "byteorder", + "bytes", + "http", + "httparse", + "input_buffer", + "log", + "native-tls", + "rand", + "sha-1", + "url", + "utf-8", +] + [[package]] name = "typenum" version = "1.12.0" @@ -1545,6 +1905,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "utf-8" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05e42f7c18b8f902290b009cde6d651262f956c98bc51bca4cd1d511c9cd85c7" + [[package]] name = "vcpkg" version = "0.2.8" @@ -1557,6 +1923,12 @@ version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed" +[[package]] +name = "waker-fn" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9571542c2ce85ce642e6b58b3364da2fb53526360dfb7c211add4f5c23105ff7" + [[package]] name = "want" version = "0.3.0" @@ -1670,6 +2042,25 @@ dependencies = [ "webpki", ] +[[package]] +name = "wepoll-binding" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "374fff4ff9701ff8b6ad0d14bacd3156c44063632d8c136186ff5967d48999a7" +dependencies = [ + "bitflags", + "wepoll-sys", +] + +[[package]] +name = "wepoll-sys" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9082a777aed991f6769e2b654aa0cb29f1c3d615daf009829b07b66c7aff6a24" +dependencies = [ + "cc", +] + [[package]] name = "winapi" version = "0.2.8" diff --git a/ethers-contract/src/contract.rs b/ethers-contract/src/contract.rs index e5a5cb23..002ed8b3 100644 --- a/ethers-contract/src/contract.rs +++ b/ethers-contract/src/contract.rs @@ -260,7 +260,10 @@ where /// Returns a new contract instance at `address`. /// /// Clones `self` internally - pub fn at>(&self, address: T) -> Self { + pub fn at>(&self, address: T) -> Self + where + P: Clone, + { let mut this = self.clone(); this.address = address.into(); this @@ -269,7 +272,10 @@ where /// Returns a new contract instance using the provided client /// /// Clones `self` internally - pub fn connect(&self, client: &'a Client) -> Self { + pub fn connect(&self, client: &'a Client) -> Self + where + P: Clone, + { let mut this = self.clone(); this.client = client; this diff --git a/ethers-providers/Cargo.toml b/ethers-providers/Cargo.toml index 91b3aaae..2a762489 100644 --- a/ethers-providers/Cargo.toml +++ b/ethers-providers/Cargo.toml @@ -9,6 +9,10 @@ homepage = "https://docs.rs/ethers" repository = "https://github.com/gakonst/ethers-rs" keywords = ["ethereum", "web3", "celo", "ethers"] +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] + [dependencies] ethers-core = { version = "0.1.3", path = "../ethers-core" } @@ -23,17 +27,45 @@ url = { version = "2.1.1", default-features = false } futures-core = { version = "0.3.5", default-features = false } futures-util = { version = "0.3.5", default-features = false } pin-project = { version = "0.4.20", default-features = false } -tokio = { version = "0.2.21", default-features = false, features = ["time"] } +async-tungstenite = { version = "0.6.0", default-features = false } + +# ws support async-std and tokio runtimes for the convenience methods +async-std = { version = "1.6.2", default-features = false, optional = true } +tokio = { version = "0.2.21", default-features = false, optional = true } + +# needed for tls +real-tokio-native-tls = { package = "tokio-native-tls", version = "0.1.0", optional = true } +async-tls = { version = "0.7.0", optional = true } +futures-timer = "3.0.2" [dev-dependencies] ethers = { version = "0.1.3", path = "../ethers" } rustc-hex = "2.1.0" tokio = { version = "0.2.21", default-features = false, features = ["rt-core", "macros"] } +async-std = { version = "1.6.2", default-features = false, features = ["attributes"] } +async-tungstenite = { version = "0.6.0", features = ["tokio-runtime"] } +serial_test = "0.4.0" [features] celo = ["ethers-core/celo"] -[package.metadata.docs.rs] -all-features = true -rustdoc-args = ["--cfg", "docsrs"] +tokio-runtime = [ + "tokio", + "async-tungstenite/tokio-runtime" +] +tokio-tls = [ + "tokio-runtime", + "async-tungstenite/tokio-native-tls", + "real-tokio-native-tls" +] + +async-std-runtime = [ + "async-std", + "async-tungstenite/async-std-runtime" +] +async-std-tls = [ + "async-std-runtime", + "async-tungstenite/async-tls", + "async-tls" +] diff --git a/ethers-providers/src/lib.rs b/ethers-providers/src/lib.rs index a02e3441..e95036c3 100644 --- a/ethers-providers/src/lib.rs +++ b/ethers-providers/src/lib.rs @@ -28,6 +28,55 @@ //! # } //! ``` //! +//! # Websockets +//! +//! The crate has support for WebSockets. If none of the provided async runtime +//! features are enabled, you must manually instantiate the WS connection and wrap +//! it with with a [`Ws::new`](method@crate::Ws::new) call. +//! +//! ```ignore +//! use ethers::providers::Ws; +//! +//! let ws = Ws::new(...); +//! ``` +//! +//! If you have compiled the library with any of the following features, you may +//! instantiate the websocket instance with the `connect` call and your URL: +//! - `tokio-runtime`: Uses `tokio` as the runtime +//! - `async-std-runtime`: Uses `async-std-runtime` +//! +//! ```no_run +//! # #[cfg(any( +//! # feature = "tokio-runtime", +//! # feature = "tokio-tls", +//! # feature = "async-std-runtime", +//! # feature = "async-std-tls", +//! # ))] +//! # async fn foo() -> Result<(), Box> { +//! # use ethers::providers::Ws; +//! let ws = Ws::connect("ws://localhost:8545").await?; +//! # Ok(()) +//! # } +//! ``` +//! +//! TLS support is also provided via the following feature flags: +//! - `tokio-tls` +//! - `async-tls` +//! +//! ```no_run +//! # #[cfg(any( +//! # feature = "tokio-runtime", +//! # feature = "tokio-tls", +//! # feature = "async-std-runtime", +//! # feature = "async-std-tls", +//! # ))] +//! # async fn foo() -> Result<(), Box> { +//! # use ethers::providers::Ws; +//! let ws = Ws::connect("wss://localhost:8545").await?; +//! # Ok(()) +//! # } +//! ``` +//! //! # Ethereum Name Service //! //! The provider may also be used to resolve [Ethereum Name Service](https://ens.domains) (ENS) names @@ -50,8 +99,8 @@ //! # Ok(()) //! # } //! ``` -mod http; -pub use http::Provider as Http; +mod transports; +pub use transports::{Http, Ws}; mod provider; @@ -76,7 +125,7 @@ pub use provider::{Provider, ProviderError}; #[async_trait] /// Trait which must be implemented by data transports to be used with the Ethereum /// JSON-RPC provider. -pub trait JsonRpcClient: Debug + Clone + Send + Sync { +pub trait JsonRpcClient: Send + Sync { /// A JSON-RPC Error type Error: Error + Into; diff --git a/ethers-providers/src/provider.rs b/ethers-providers/src/provider.rs index b9559d71..bafed72c 100644 --- a/ethers-providers/src/provider.rs +++ b/ethers-providers/src/provider.rs @@ -1,8 +1,7 @@ use crate::{ ens, - http::Provider as HttpProvider, stream::{FilterStream, FilterWatcher}, - JsonRpcClient, PendingTransaction, + Http as HttpProvider, JsonRpcClient, PendingTransaction, }; use ethers_core::{ diff --git a/ethers-providers/src/stream.rs b/ethers-providers/src/stream.rs index 63b05d5b..3517523c 100644 --- a/ethers-providers/src/stream.rs +++ b/ethers-providers/src/stream.rs @@ -3,7 +3,8 @@ use crate::ProviderError; use ethers_core::types::U256; use futures_core::{stream::Stream, TryFuture}; -use futures_util::StreamExt; +use futures_timer::Delay; +use futures_util::{stream, FutureExt, StreamExt}; use pin_project::pin_project; use serde::Deserialize; use std::{ @@ -13,7 +14,11 @@ use std::{ time::Duration, vec::IntoIter, }; -use tokio::time::{interval, Interval}; + +// https://github.com/tomusdrw/rust-web3/blob/befcb2fb8f3ca0a43e3081f68886fa327e64c8e6/src/api/eth_filter.rs#L20 +fn interval(duration: Duration) -> impl Stream + Send + Unpin { + stream::unfold((), move |_| Delay::new(duration).map(|_| Some(((), ())))).map(drop) +} const DEFAULT_POLL_DURATION: Duration = Duration::from_millis(7000); @@ -54,7 +59,7 @@ pub(crate) struct FilterWatcher { factory: F, // The polling interval - interval: Interval, + interval: Box + Send + Unpin>, state: FilterWatcherState, } @@ -68,7 +73,7 @@ where pub fn new>(id: T, factory: F) -> Self { Self { id: id.into(), - interval: interval(DEFAULT_POLL_DURATION), + interval: Box::new(interval(DEFAULT_POLL_DURATION)), state: FilterWatcherState::WaitForInterval, factory, } @@ -86,7 +91,7 @@ where } fn interval>(mut self, duration: T) -> Self { - self.interval = interval(Duration::from_millis(duration.into())); + self.interval = Box::new(interval(Duration::from_millis(duration.into()))); self } } @@ -107,21 +112,18 @@ where *this.state = match this.state { FilterWatcherState::WaitForInterval => { // Wait the polling period - let mut interval = Box::pin(this.interval.tick()); - let _ready = futures_util::ready!(interval.as_mut().poll(cx)); + let _ready = futures_util::ready!(this.interval.poll_next_unpin(cx)); // create a new instance of the future + cx.waker().wake_by_ref(); FilterWatcherState::GetFilterChanges(this.factory.as_mut().new()) } FilterWatcherState::GetFilterChanges(fut) => { - // wait for the future to be ready - let mut fut = Box::pin(fut); - // NOTE: If the provider returns an error, this will return an empty // vector. Should we make this return a Result instead? Ideally if we're // in a streamed loop we wouldn't want the loop to terminate if an error // is encountered (since it might be a temporary error). - let items: Vec = futures_util::ready!(fut.as_mut().poll(cx)).unwrap_or_default(); + let items: Vec = futures_util::ready!(fut.poll_unpin(cx)).unwrap_or_default(); FilterWatcherState::NextItem(items.into_iter()) } // Consume 1 element from the vector. If more elements are in the vector, @@ -130,7 +132,10 @@ where // for new logs FilterWatcherState::NextItem(iter) => match iter.next() { Some(item) => return Poll::Ready(Some(item)), - None => FilterWatcherState::WaitForInterval, + None => { + cx.waker().wake_by_ref(); + FilterWatcherState::WaitForInterval + } }, }; @@ -184,14 +189,16 @@ mod watch { async fn stream() { let factory = || Box::pin(async { Ok::, ProviderError>(vec![1, 2, 3]) }); let filter = FilterWatcher::<_, u64>::new(1, factory); - let mut stream = filter.interval(1u64).stream(); - assert_eq!(stream.next().await.unwrap(), 1); + // stream combinator calls are still doable since FilterStream extends + // Stream and StreamExt + let mut stream = filter.interval(100u64).stream().map(|x| 2 * x); assert_eq!(stream.next().await.unwrap(), 2); - assert_eq!(stream.next().await.unwrap(), 3); + assert_eq!(stream.next().await.unwrap(), 4); + assert_eq!(stream.next().await.unwrap(), 6); // this will poll the factory function again since it consumed the entire // vector, so it'll wrap around. Realistically, we'd then sleep for a few seconds // until new blocks are mined, until the call to the factory returns a non-empty // vector of logs - assert_eq!(stream.next().await.unwrap(), 1); + assert_eq!(stream.next().await.unwrap(), 2); } } diff --git a/ethers-providers/src/transports/common.rs b/ethers-providers/src/transports/common.rs new file mode 100644 index 00000000..d65b7d65 --- /dev/null +++ b/ethers-providers/src/transports/common.rs @@ -0,0 +1,85 @@ +// Code adapted from: https://github.com/althea-net/guac_rs/tree/master/web3/src/jsonrpc +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::fmt; +use thiserror::Error; + +#[derive(Serialize, Deserialize, Debug, Clone, Error)] +/// A JSON-RPC 2.0 error +pub struct JsonRpcError { + /// The error code + pub code: i64, + /// The error message + pub message: String, + /// Additional data + pub data: Option, +} + +impl fmt::Display for JsonRpcError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "(code: {}, message: {}, data: {:?})", + self.code, self.message, self.data + ) + } +} + +#[derive(Serialize, Deserialize, Debug)] +/// A JSON-RPC request +pub struct Request<'a, T> { + id: u64, + jsonrpc: &'a str, + method: &'a str, + params: T, +} + +impl<'a, T> Request<'a, T> { + /// Creates a new JSON RPC request + pub fn new(id: u64, method: &'a str, params: T) -> Self { + Self { + id, + jsonrpc: "2.0", + method, + params, + } + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct Response { + id: u64, + jsonrpc: String, + #[serde(flatten)] + pub data: ResponseData, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(untagged)] +pub enum ResponseData { + Error { error: JsonRpcError }, + Success { result: R }, +} + +impl ResponseData { + /// Consume response and return value + pub fn into_result(self) -> Result { + match self { + ResponseData::Success { result } => Ok(result), + ResponseData::Error { error } => Err(error), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn response() { + let response: Response = + serde_json::from_str(r#"{"jsonrpc": "2.0", "result": 19, "id": 1}"#).unwrap(); + assert_eq!(response.id, 1); + assert_eq!(response.data.into_result().unwrap(), 19); + } +} diff --git a/ethers-providers/src/http.rs b/ethers-providers/src/transports/http.rs similarity index 58% rename from ethers-providers/src/http.rs rename to ethers-providers/src/transports/http.rs index fc2ad66a..cea58f0b 100644 --- a/ethers-providers/src/http.rs +++ b/ethers-providers/src/transports/http.rs @@ -4,15 +4,15 @@ use crate::{provider::ProviderError, JsonRpcClient}; use async_trait::async_trait; use reqwest::{Client, Error as ReqwestError}; use serde::{Deserialize, Serialize}; -use serde_json::Value; use std::{ - fmt, str::FromStr, sync::atomic::{AtomicU64, Ordering}, }; use thiserror::Error; use url::Url; +use super::common::{JsonRpcError, Request, Response}; + /// A low-level JSON-RPC Client over HTTP. /// /// # Example @@ -42,7 +42,7 @@ pub enum ClientError { ReqwestError(#[from] ReqwestError), #[error(transparent)] /// Thrown if the response could not be parsed - JsonRpcError(#[from] errors::JsonRpcError), + JsonRpcError(#[from] JsonRpcError), } impl From for ProviderError { @@ -118,88 +118,3 @@ impl Clone for Provider { } } } - -// leak private type w/o exposing it -mod errors { - use super::*; - - #[derive(Serialize, Deserialize, Debug, Clone, Error)] - /// A JSON-RPC 2.0 error - pub struct JsonRpcError { - /// The error code - pub code: i64, - /// The error message - pub message: String, - /// Additional data - pub data: Option, - } - - impl fmt::Display for JsonRpcError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "(code: {}, message: {}, data: {:?})", - self.code, self.message, self.data - ) - } - } -} - -#[derive(Serialize, Deserialize, Debug)] -/// A JSON-RPC request -struct Request<'a, T> { - id: u64, - jsonrpc: &'a str, - method: &'a str, - params: T, -} - -impl<'a, T> Request<'a, T> { - /// Creates a new JSON RPC request - fn new(id: u64, method: &'a str, params: T) -> Self { - Self { - id, - jsonrpc: "2.0", - method, - params, - } - } -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -struct Response { - id: u64, - jsonrpc: String, - #[serde(flatten)] - data: ResponseData, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -#[serde(untagged)] -enum ResponseData { - Error { error: errors::JsonRpcError }, - Success { result: R }, -} - -impl ResponseData { - /// Consume response and return value - fn into_result(self) -> Result { - match self { - ResponseData::Success { result } => Ok(result), - ResponseData::Error { error } => Err(error), - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn response() { - let response: Response = - serde_json::from_str(r#"{"jsonrpc": "2.0", "result": 19, "id": 1}"#).unwrap(); - assert_eq!(response.id, 1); - assert_eq!(response.data.into_result().unwrap(), 19); - } -} diff --git a/ethers-providers/src/transports/mod.rs b/ethers-providers/src/transports/mod.rs new file mode 100644 index 00000000..5855e465 --- /dev/null +++ b/ethers-providers/src/transports/mod.rs @@ -0,0 +1,7 @@ +mod common; + +mod http; +pub use http::Provider as Http; + +mod ws; +pub use ws::Provider as Ws; diff --git a/ethers-providers/src/transports/ws.rs b/ethers-providers/src/transports/ws.rs new file mode 100644 index 00000000..15d938bc --- /dev/null +++ b/ethers-providers/src/transports/ws.rs @@ -0,0 +1,194 @@ +use crate::{provider::ProviderError, JsonRpcClient}; + +use async_trait::async_trait; +use async_tungstenite::tungstenite::{self, protocol::Message}; +use futures_util::{ + lock::Mutex, + sink::{Sink, SinkExt}, + stream::{Stream, StreamExt}, +}; +use serde::{Deserialize, Serialize}; +use std::fmt::Debug; +use std::sync::atomic::{AtomicU64, Ordering}; +use thiserror::Error; + +use super::common::{JsonRpcError, Request, ResponseData}; + +// Convenience methods for connecting with async-std/tokio: + +#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] +use async_tungstenite::WebSocketStream; + +// connect_async +#[cfg(all(feature = "async-std-runtime", not(feature = "tokio-runtime")))] +use async_tungstenite::async_std::connect_async; +#[cfg(feature = "tokio-runtime")] +use async_tungstenite::tokio::{connect_async, TokioAdapter}; + +#[cfg(feature = "tokio-runtime")] +type TcpStream = TokioAdapter; +#[cfg(all(feature = "async-std-runtime", not(feature = "tokio-runtime")))] +type TcpStream = async_std::net::TcpStream; + +// If there is no TLS, just use the TCP Stream +#[cfg(all(feature = "tokio-runtime", not(feature = "tokio-tls")))] +pub type MaybeTlsStream = TcpStream; +#[cfg(all(feature = "async-std-runtime", not(feature = "async-std-tls")))] +pub type MaybeTlsStream = TcpStream; + +// Use either +#[cfg(feature = "tokio-tls")] +type TlsStream = real_tokio_native_tls::TlsStream; +#[cfg(all(feature = "async-std-tls", not(feature = "tokio-tls")))] +type TlsStream = async_tls::client::TlsStream; + +#[cfg(any(feature = "tokio-tls", feature = "async-std-tls"))] +pub use async_tungstenite::stream::Stream as StreamSwitcher; +#[cfg(feature = "tokio-tls")] +pub type MaybeTlsStream = + StreamSwitcher>>>; +#[cfg(all(feature = "async-std-tls", not(feature = "tokio-tls")))] +pub type MaybeTlsStream = StreamSwitcher>; + +/// A JSON-RPC Client over Websockets. +/// +/// If the library is not compiled with any runtime support, then you will have +/// to manually instantiate a websocket connection and call `Provider::new` on it. +/// +/// ```ignore +/// use ethers::providers::Ws; +/// +/// let ws = Ws::new(...) +/// ``` +/// +/// If you have compiled the library with any of the following features, you may +/// instantiate the websocket instance with the `connect` call and your URL: +/// - `tokio-runtime`: Uses `tokio` as the runtime +/// - `tokio-tls`: Same as `tokio-runtime` but with TLS support +/// - `async-std-runtime`: Uses `async-std-runtime` +/// - `async-tls`: Same as `async-std-runtime` but with TLS support +/// +/// ```no_run +/// # #[cfg(any( +/// # feature = "tokio-runtime", +/// # feature = "tokio-tls", +/// # feature = "async-std-runtime", +/// # feature = "async-std-tls", +/// # ))] +/// # async fn foo() -> Result<(), Box> { +/// use ethers::providers::Ws; +/// +/// let ws = Ws::connect("ws://localhost:8545").await?; +/// +/// // If built with TLS support (otherwise will get a "TLS Support not compiled in" error) +/// let ws = Ws::connect("wss://localhost:8545").await?; +/// # Ok(()) +/// # } +/// ``` +/// +/// This feature is built using [`async-tungstenite`](https://docs.rs/async-tungstenite). If you need other runtimes, +/// consider importing `async-tungstenite` with the [corresponding feature +/// flag](https://github.com/sdroege/async-tungstenite/blob/master/Cargo.toml#L15-L22) +/// for your runtime. +pub struct Provider { + id: AtomicU64, + ws: Mutex, +} + +#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] +impl Provider> { + /// Initializes a new WebSocket Client. + /// separately. + pub async fn connect( + url: impl tungstenite::client::IntoClientRequest + Unpin, + ) -> Result { + let (ws, _) = connect_async(url).await?; + Ok(Self::new(ws)) + } +} + +impl Provider +where + S: Send + + Sync + + Stream> + + Sink + + Unpin, +{ + /// Initializes a new WebSocket Client. The websocket connection must be initiated + /// separately. + pub fn new(ws: S) -> Self { + Self { + id: AtomicU64::new(0), + ws: Mutex::new(ws), + } + } +} + +#[derive(Error, Debug)] +/// Error thrown when sending a WS message +pub enum ClientError { + /// Thrown if deserialization failed + #[error(transparent)] + JsonError(#[from] serde_json::Error), + + #[error(transparent)] + /// Thrown if the response could not be parsed + JsonRpcError(#[from] JsonRpcError), + + /// Thrown if the websocket didn't respond to our message + #[error("Websocket connection did not respond with data")] + NoResponse, + + /// Thrown if there's an error over the WS connection + #[error(transparent)] + TungsteniteError(#[from] tungstenite::Error), +} + +impl From for ProviderError { + fn from(src: ClientError) -> Self { + ProviderError::JsonRpcClientError(Box::new(src)) + } +} + +#[async_trait] +impl JsonRpcClient for Provider +where + S: Send + + Sync + + Stream> + + Sink + + Unpin, +{ + type Error = ClientError; + + /// Sends a POST request with the provided method and the params serialized as JSON + /// over WebSockets + async fn request Deserialize<'a>>( + &self, + method: &str, + params: T, + ) -> Result { + // we get a lock on the websocket to avoid race conditions with multiple borrows + let mut lock = self.ws.lock().await; + + let next_id = self.id.load(Ordering::SeqCst) + 1; + self.id.store(next_id, Ordering::SeqCst); + + // send the message + let payload = serde_json::to_string(&Request::new(next_id, method, params))?; + lock.send(Message::text(payload)).await?; + + // get the response bytes + let resp = lock.next().await.ok_or(ClientError::NoResponse)??; + + let data: ResponseData = match resp { + Message::Text(inner) => serde_json::from_str(&inner)?, + Message::Binary(inner) => serde_json::from_slice(&inner)?, + // TODO: Should we do something if we receive a Ping, Pong or Close? + _ => return Err(ClientError::NoResponse), + }; + + Ok(data.into_result()?) + } +} diff --git a/ethers-providers/tests/provider.rs b/ethers-providers/tests/provider.rs index 3bcdb9d9..7189b534 100644 --- a/ethers-providers/tests/provider.rs +++ b/ethers-providers/tests/provider.rs @@ -1,25 +1,80 @@ +#![allow(unused_braces)] use ethers::providers::{Http, Provider}; use std::convert::TryFrom; -#[tokio::test] #[cfg(not(feature = "celo"))] -async fn pending_txs_with_confirmations_ganache() { +mod eth_tests { + use super::*; use ethers::{ types::TransactionRequest, utils::{parse_ether, Ganache}, }; + use serial_test::serial; - let _ganache = Ganache::new().block_time(2u64).spawn(); - let provider = Provider::::try_from("http://localhost:8545").unwrap(); - let accounts = provider.get_accounts().await.unwrap(); + // Without TLS this would error with "TLS Support not compiled in" + #[test] + #[cfg(any(feature = "async-std-tls", feature = "tokio-tls"))] + fn ssl_websocket() { + // this is extremely ugly but I couldn't figure out a better way of having + // a shared async test for both runtimes + #[cfg(feature = "async-std-tls")] + let block_on = async_std::task::block_on; + #[cfg(feature = "tokio-tls")] + let mut runtime = tokio::runtime::Runtime::new().unwrap(); + #[cfg(feature = "tokio-tls")] + let mut block_on = |x| runtime.block_on(x); - let tx = TransactionRequest::pay(accounts[1], parse_ether(1u64).unwrap()).from(accounts[0]); - let pending_tx = provider.send_transaction(tx).await.unwrap(); - let hash = *pending_tx; - let receipt = pending_tx.confirmations(5).await.unwrap(); + use ethers::providers::Ws; + block_on(async move { + let ws = Ws::connect("wss://rinkeby.infura.io/ws/v3/c60b0bb42f8a4c6481ecd229eddaca27") + .await + .unwrap(); + let provider = Provider::new(ws); + let _number = provider.get_block_number().await.unwrap(); + }); + } - // got the correct receipt - assert_eq!(receipt.transaction_hash, hash); + #[tokio::test] + #[serial] + #[cfg(feature = "tokio-runtime")] + async fn watch_blocks_websocket() { + use ethers::{ + providers::{FilterStream, StreamExt, Ws}, + types::H256, + }; + + let _ganache = Ganache::new().block_time(2u64).spawn(); + let (ws, _) = async_tungstenite::tokio::connect_async("ws://localhost:8545") + .await + .unwrap(); + let provider = Provider::new(Ws::new(ws)); + + let stream = provider + .watch_blocks() + .await + .unwrap() + .interval(2000u64) + .stream(); + + let _blocks = stream.take(3usize).collect::>().await; + let _number = provider.get_block_number().await.unwrap(); + } + + #[tokio::test] + #[serial] + async fn pending_txs_with_confirmations_ganache() { + let _ganache = Ganache::new().block_time(2u64).spawn(); + let provider = Provider::::try_from("http://localhost:8545").unwrap(); + let accounts = provider.get_accounts().await.unwrap(); + + let tx = TransactionRequest::pay(accounts[1], parse_ether(1u64).unwrap()).from(accounts[0]); + let pending_tx = provider.send_transaction(tx).await.unwrap(); + let hash = *pending_tx; + let receipt = pending_tx.confirmations(5).await.unwrap(); + + // got the correct receipt + assert_eq!(receipt.transaction_hash, hash); + } } #[cfg(feature = "celo")] diff --git a/ethers-signers/src/client.rs b/ethers-signers/src/client.rs index b23cd7a3..a907d07b 100644 --- a/ethers-signers/src/client.rs +++ b/ethers-signers/src/client.rs @@ -183,7 +183,10 @@ where /// calls. /// /// Clones internally. - pub fn with_signer(&self, signer: S) -> Self { + pub fn with_signer(&self, signer: S) -> Self + where + P: Clone, + { let mut this = self.clone(); this.signer = signer; this @@ -193,7 +196,10 @@ where /// calls. /// /// Clones internally. - pub fn with_provider(&self, provider: Provider

) -> Self { + pub fn with_provider(&self, provider: Provider

) -> Self + where + P: Clone, + { let mut this = self.clone(); this.provider = provider; this diff --git a/ethers/Cargo.toml b/ethers/Cargo.toml index f91e7b26..e55f9401 100644 --- a/ethers/Cargo.toml +++ b/ethers/Cargo.toml @@ -49,6 +49,7 @@ ethers-signers = { version = "0.1.3", path = "../ethers-signers", optional = tru [dev-dependencies] ethers-contract = { version = "0.1.3", path = "../ethers-contract", features = ["abigen"] } +ethers-providers = { version = "0.1.3", path = "../ethers-providers", features = ["tokio-runtime"] } anyhow = "1.0.31" rand = "0.7" diff --git a/ethers/examples/watch_blocks.rs b/ethers/examples/watch_blocks.rs index 25741fb8..bc65b138 100644 --- a/ethers/examples/watch_blocks.rs +++ b/ethers/examples/watch_blocks.rs @@ -1,9 +1,9 @@ use ethers::prelude::*; -use std::convert::TryFrom; #[tokio::main] async fn main() -> anyhow::Result<()> { - let provider = Provider::::try_from("http://localhost:8545")?; + let ws = Ws::connect("ws://localhost:8546").await?; + let provider = Provider::new(ws); let mut stream = provider.watch_blocks().await?.interval(2000u64).stream(); while let Some(block) = stream.next().await { dbg!(block);