From 0eee674ba05436d8c53710901c6fb66e38973922 Mon Sep 17 00:00:00 2001 From: James Prestwich <10149425+prestwich@users.noreply.github.com> Date: Fri, 20 Aug 2021 09:58:13 -0700 Subject: [PATCH] feature: drop WS server on when WS connection closes (#396) * feature: drop WS server on when WS connection closes * feature: ready function on WS * feature: ws now handles all message types * bug: SubscriptionStream ends if its sender drops --- Cargo.lock | 56 +++++++++++++-------------- ethers-providers/Cargo.toml | 4 +- ethers-providers/src/pubsub.rs | 12 ++++-- ethers-providers/src/transports/ws.rs | 52 ++++++++++++++++++------- 4 files changed, 78 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2647fbfc..f5469e2c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1313,9 +1313,9 @@ dependencies = [ [[package]] name = "httparse" -version = "1.4.1" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3a87b616e37e93c22fb19bcd386f02f3af5ea98a25670ad0fce773de23c5e68" +checksum = "acd94fdbe1d4ff688b67b04eee2e17bd50995534a61539e45adfefb45e5e5503" [[package]] name = "httpdate" @@ -1448,9 +1448,9 @@ checksum = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736" [[package]] name = "js-sys" -version = "0.3.52" +version = "0.3.53" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce791b7ca6638aae45be056e068fc756d871eb3b3b10b8efa62d1c9cec616752" +checksum = "e4bf49d50e2961077d9c99f4b7997d770a1114f087c3c2e0069b36c13fc2979d" dependencies = [ "wasm-bindgen", ] @@ -1535,9 +1535,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.4.0" +version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b16bd47d9e329435e309c58469fe0791c2d0d1ba96ec0954152a5ae2b04387dc" +checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" [[package]] name = "mime" @@ -1648,9 +1648,9 @@ dependencies = [ [[package]] name = "object" -version = "0.26.0" +version = "0.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c55827317fb4c08822499848a14237d2874d6f139828893017237e7ab93eb386" +checksum = "ee2766204889d09937d00bfbb7fec56bb2a199e2ade963cab19185d8a6104c7c" dependencies = [ "memchr", ] @@ -1675,9 +1675,9 @@ checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" [[package]] name = "openssl" -version = "0.10.35" +version = "0.10.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "549430950c79ae24e6d02e0b7404534ecf311d94cc9f861e9e4020187d13d885" +checksum = "8d9facdb76fec0b73c406f125d44d86fdad818d66fef0531eec9233ca425ff4a" dependencies = [ "bitflags", "cfg-if 1.0.0", @@ -1695,9 +1695,9 @@ checksum = "28988d872ab76095a6e6ac88d99b54fd267702734fd7ffe610ca27f533ddb95a" [[package]] name = "openssl-sys" -version = "0.9.65" +version = "0.9.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a7907e3bfa08bb85105209cdfcb6c63d109f8f6c1ed6ca318fff5c1853fbc1d" +checksum = "1996d2d305e561b70d1ee0c53f1542833f4e1ac6ce9a6708b6ff2738ca67dc82" dependencies = [ "autocfg", "cc", @@ -2796,9 +2796,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.18" +version = "0.1.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9ff14f98b1a4b289c6248a023c1c2fa1491062964e9fed67ab29c4e4da4a052" +checksum = "2ca517f43f0fb96e0c3072ed5c275fe5eece87e8cb52f4a77b69226d3b1c9df8" dependencies = [ "lazy_static", ] @@ -3000,9 +3000,9 @@ checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" [[package]] name = "wasm-bindgen" -version = "0.2.75" +version = "0.2.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b608ecc8f4198fe8680e2ed18eccab5f0cd4caaf3d83516fa5fb2e927fda2586" +checksum = "8ce9b1b516211d33767048e5d47fa2a381ed8b76fc48d2ce4aa39877f9f183e0" dependencies = [ "cfg-if 1.0.0", "serde", @@ -3012,9 +3012,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.75" +version = "0.2.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "580aa3a91a63d23aac5b6b267e2d13cb4f363e31dce6c352fca4752ae12e479f" +checksum = "cfe8dc78e2326ba5f845f4b5bf548401604fa20b1dd1d365fb73b6c1d6364041" dependencies = [ "bumpalo", "lazy_static", @@ -3027,9 +3027,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.25" +version = "0.4.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16646b21c3add8e13fdb8f20172f8a28c3dbf62f45406bcff0233188226cfe0c" +checksum = "95fded345a6559c2cfee778d562300c581f7d4ff3edb9b0d230d69800d213972" dependencies = [ "cfg-if 1.0.0", "js-sys", @@ -3039,9 +3039,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.75" +version = "0.2.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "171ebf0ed9e1458810dfcb31f2e766ad6b3a89dbda42d8901f2b268277e5f09c" +checksum = "44468aa53335841d9d6b6c023eaab07c0cd4bddbcfdee3e2bb1e8d2cb8069fef" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -3049,9 +3049,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.75" +version = "0.2.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c2657dd393f03aa2a659c25c6ae18a13a4048cebd220e147933ea837efc589f" +checksum = "0195807922713af1e67dc66132c7328206ed9766af3858164fb583eedc25fbad" dependencies = [ "proc-macro2", "quote", @@ -3062,15 +3062,15 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.75" +version = "0.2.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e0c4a743a309662d45f4ede961d7afa4ba4131a59a639f29b0069c3798bbcc2" +checksum = "acdb075a845574a1fa5f09fd77e43f7747599301ea3417a9fbffdeedfc1f4a29" [[package]] name = "web-sys" -version = "0.3.52" +version = "0.3.53" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01c70a82d842c9979078c772d4a1344685045f1a5628f677c2b2eab4dd7d2696" +checksum = "224b2f6b67919060055ef1a67807367c2066ed520c3862cc013d26cf893a783c" dependencies = [ "js-sys", "wasm-bindgen", diff --git a/ethers-providers/Cargo.toml b/ethers-providers/Cargo.toml index 6403297e..dcb678e4 100644 --- a/ethers-providers/Cargo.toml +++ b/ethers-providers/Cargo.toml @@ -26,10 +26,10 @@ url = { version = "2.2.2", default-features = false } auto_impl = { version = "0.4.1", default-features = false } # required for implementing stream on the filters -futures-core = { version = "0.3.12", default-features = false } +futures-core = { version = "0.3.16", default-features = false } futures-util = { version = "0.3.16" } futures-timer = { version = "3.0.2", default-features = false } -futures-channel = { version = "0.3.13", default-features = false } +futures-channel = { version = "0.3.16", default-features = false } pin-project = { version = "1.0.7", default-features = false } # tracing diff --git a/ethers-providers/src/pubsub.rs b/ethers-providers/src/pubsub.rs index f3a1dcb2..668704cc 100644 --- a/ethers-providers/src/pubsub.rs +++ b/ethers-providers/src/pubsub.rs @@ -44,7 +44,13 @@ where P: PubsubClient, R: DeserializeOwned, { - /// Creates a new subscription stream for the provided subscription id + /// Creates a new subscription stream for the provided subscription id. + /// + /// ### Note + /// Most providers treat `SubscriptionStream` IDs as global singletons. + /// Instanitating this directly with a known ID will likely cause any + /// existing streams with that ID to end. To avoid this, start a new stream + /// using [`Provider::subscribe`] instead of `SubscriptionStream::new`. pub fn new(id: U256, provider: &'a Provider

) -> Result { // Call the underlying PubsubClient's subscribe let rx = provider.as_ref().subscribe(id)?; @@ -56,7 +62,7 @@ where }) } - /// Unsubscribes from the subscription + /// Unsubscribes from the subscription. pub async fn unsubscribe(&self) -> Result { self.provider.unsubscribe(self.id).await } @@ -79,7 +85,7 @@ where Ok(res) => Poll::Ready(Some(res)), _ => Poll::Pending, }, - None => Poll::Pending, + None => Poll::Ready(None), } } } diff --git a/ethers-providers/src/transports/ws.rs b/ethers-providers/src/transports/ws.rs index f8f1c2c8..38b9c7aa 100644 --- a/ethers-providers/src/transports/ws.rs +++ b/ethers-providers/src/transports/ws.rs @@ -23,7 +23,10 @@ use std::{ use thiserror::Error; use tokio_tungstenite::{ connect_async, - tungstenite::{self, protocol::Message}, + tungstenite::{ + self, + protocol::{CloseFrame, Message}, + }, }; use tracing::{error, warn}; @@ -91,6 +94,11 @@ impl Ws { } } + /// Returns true if the WS connection is active, false otherwise + pub fn ready(&self) -> bool { + !self.requests.is_closed() + } + /// Initializes a new WebSocket Client pub async fn connect( url: impl tungstenite::client::IntoClientRequest + Unpin, @@ -189,7 +197,16 @@ where { let f = async move { loop { - self.process().await.expect("WS Server panic"); + match self.process().await { + Err(ClientError::UnexpectedClose) => { + tracing::error!("{}", ClientError::UnexpectedClose); + break; + } + Err(_) => { + panic!("WS Server panic"); + } + _ => {} + } } }; @@ -201,19 +218,18 @@ where async fn process(&mut self) -> Result<(), ClientError> { futures_util::select! { // Handle requests - msg = self.requests.next() => match msg { - Some(msg) => self.handle_request(msg).await?, - None => {}, + msg = self.requests.select_next_some() => { + self.handle_request(msg).await?; }, // Handle ws messages msg = self.ws.next() => match msg { Some(Ok(msg)) => self.handle_ws(msg).await?, // TODO: Log the error? Some(Err(_)) => {}, - None => {}, - }, - // finished - complete => {}, + None => { + return Err(ClientError::UnexpectedClose); + }, + } }; Ok(()) @@ -258,7 +274,9 @@ where Message::Text(inner) => self.handle_text(inner).await, Message::Ping(inner) => self.handle_ping(inner).await, Message::Pong(_) => Ok(()), // Server is allowed to send unsolicited pongs. - _ => Err(ClientError::NoResponse), + Message::Close(Some(frame)) => Err(ClientError::WsClosed(frame)), + Message::Close(None) => Err(ClientError::UnexpectedClose), + Message::Binary(buf) => Err(ClientError::UnexpectedBinary(buf)), } } @@ -304,9 +322,9 @@ pub enum ClientError { /// Thrown if the response could not be parsed JsonRpcError(#[from] JsonRpcError), - /// Thrown if the websocket didn't respond to our message - #[error("Websocket connection did not respond with text data")] - NoResponse, + /// Thrown if the websocket responds with binary data + #[error("Websocket responded with unexpected binary data")] + UnexpectedBinary(Vec), /// Thrown if there's an error over the WS connection #[error(transparent)] @@ -317,6 +335,14 @@ pub enum ClientError { #[error(transparent)] Canceled(#[from] oneshot::Canceled), + + /// Remote server sent a Close message + #[error("Websocket closed with info: {0:?}")] + WsClosed(CloseFrame<'static>), + + /// Something caused the websocket to close + #[error("WebSocket connection closed unexpectedly")] + UnexpectedClose, } impl From for ProviderError {