feature: drop WS server on when WS connection closes (#396)

* feature: drop WS server on when WS connection closes

* feature: ready function on WS

* feature: ws now handles all message types

* bug: SubscriptionStream ends if its sender drops
This commit is contained in:
James Prestwich 2021-08-20 09:58:13 -07:00 committed by GitHub
parent ab0e3ca0d6
commit 0eee674ba0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 78 additions and 46 deletions

56
Cargo.lock generated
View File

@ -1313,9 +1313,9 @@ dependencies = [
[[package]]
name = "httparse"
version = "1.4.1"
version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3a87b616e37e93c22fb19bcd386f02f3af5ea98a25670ad0fce773de23c5e68"
checksum = "acd94fdbe1d4ff688b67b04eee2e17bd50995534a61539e45adfefb45e5e5503"
[[package]]
name = "httpdate"
@ -1448,9 +1448,9 @@ checksum = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736"
[[package]]
name = "js-sys"
version = "0.3.52"
version = "0.3.53"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce791b7ca6638aae45be056e068fc756d871eb3b3b10b8efa62d1c9cec616752"
checksum = "e4bf49d50e2961077d9c99f4b7997d770a1114f087c3c2e0069b36c13fc2979d"
dependencies = [
"wasm-bindgen",
]
@ -1535,9 +1535,9 @@ dependencies = [
[[package]]
name = "memchr"
version = "2.4.0"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b16bd47d9e329435e309c58469fe0791c2d0d1ba96ec0954152a5ae2b04387dc"
checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a"
[[package]]
name = "mime"
@ -1648,9 +1648,9 @@ dependencies = [
[[package]]
name = "object"
version = "0.26.0"
version = "0.26.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c55827317fb4c08822499848a14237d2874d6f139828893017237e7ab93eb386"
checksum = "ee2766204889d09937d00bfbb7fec56bb2a199e2ade963cab19185d8a6104c7c"
dependencies = [
"memchr",
]
@ -1675,9 +1675,9 @@ checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
[[package]]
name = "openssl"
version = "0.10.35"
version = "0.10.36"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "549430950c79ae24e6d02e0b7404534ecf311d94cc9f861e9e4020187d13d885"
checksum = "8d9facdb76fec0b73c406f125d44d86fdad818d66fef0531eec9233ca425ff4a"
dependencies = [
"bitflags",
"cfg-if 1.0.0",
@ -1695,9 +1695,9 @@ checksum = "28988d872ab76095a6e6ac88d99b54fd267702734fd7ffe610ca27f533ddb95a"
[[package]]
name = "openssl-sys"
version = "0.9.65"
version = "0.9.66"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a7907e3bfa08bb85105209cdfcb6c63d109f8f6c1ed6ca318fff5c1853fbc1d"
checksum = "1996d2d305e561b70d1ee0c53f1542833f4e1ac6ce9a6708b6ff2738ca67dc82"
dependencies = [
"autocfg",
"cc",
@ -2796,9 +2796,9 @@ dependencies = [
[[package]]
name = "tracing-core"
version = "0.1.18"
version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9ff14f98b1a4b289c6248a023c1c2fa1491062964e9fed67ab29c4e4da4a052"
checksum = "2ca517f43f0fb96e0c3072ed5c275fe5eece87e8cb52f4a77b69226d3b1c9df8"
dependencies = [
"lazy_static",
]
@ -3000,9 +3000,9 @@ checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6"
[[package]]
name = "wasm-bindgen"
version = "0.2.75"
version = "0.2.76"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b608ecc8f4198fe8680e2ed18eccab5f0cd4caaf3d83516fa5fb2e927fda2586"
checksum = "8ce9b1b516211d33767048e5d47fa2a381ed8b76fc48d2ce4aa39877f9f183e0"
dependencies = [
"cfg-if 1.0.0",
"serde",
@ -3012,9 +3012,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.75"
version = "0.2.76"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "580aa3a91a63d23aac5b6b267e2d13cb4f363e31dce6c352fca4752ae12e479f"
checksum = "cfe8dc78e2326ba5f845f4b5bf548401604fa20b1dd1d365fb73b6c1d6364041"
dependencies = [
"bumpalo",
"lazy_static",
@ -3027,9 +3027,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-futures"
version = "0.4.25"
version = "0.4.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16646b21c3add8e13fdb8f20172f8a28c3dbf62f45406bcff0233188226cfe0c"
checksum = "95fded345a6559c2cfee778d562300c581f7d4ff3edb9b0d230d69800d213972"
dependencies = [
"cfg-if 1.0.0",
"js-sys",
@ -3039,9 +3039,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.75"
version = "0.2.76"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "171ebf0ed9e1458810dfcb31f2e766ad6b3a89dbda42d8901f2b268277e5f09c"
checksum = "44468aa53335841d9d6b6c023eaab07c0cd4bddbcfdee3e2bb1e8d2cb8069fef"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
@ -3049,9 +3049,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.75"
version = "0.2.76"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c2657dd393f03aa2a659c25c6ae18a13a4048cebd220e147933ea837efc589f"
checksum = "0195807922713af1e67dc66132c7328206ed9766af3858164fb583eedc25fbad"
dependencies = [
"proc-macro2",
"quote",
@ -3062,15 +3062,15 @@ dependencies = [
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.75"
version = "0.2.76"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e0c4a743a309662d45f4ede961d7afa4ba4131a59a639f29b0069c3798bbcc2"
checksum = "acdb075a845574a1fa5f09fd77e43f7747599301ea3417a9fbffdeedfc1f4a29"
[[package]]
name = "web-sys"
version = "0.3.52"
version = "0.3.53"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "01c70a82d842c9979078c772d4a1344685045f1a5628f677c2b2eab4dd7d2696"
checksum = "224b2f6b67919060055ef1a67807367c2066ed520c3862cc013d26cf893a783c"
dependencies = [
"js-sys",
"wasm-bindgen",

View File

@ -26,10 +26,10 @@ url = { version = "2.2.2", default-features = false }
auto_impl = { version = "0.4.1", default-features = false }
# required for implementing stream on the filters
futures-core = { version = "0.3.12", default-features = false }
futures-core = { version = "0.3.16", default-features = false }
futures-util = { version = "0.3.16" }
futures-timer = { version = "3.0.2", default-features = false }
futures-channel = { version = "0.3.13", default-features = false }
futures-channel = { version = "0.3.16", default-features = false }
pin-project = { version = "1.0.7", default-features = false }
# tracing

View File

@ -44,7 +44,13 @@ where
P: PubsubClient,
R: DeserializeOwned,
{
/// Creates a new subscription stream for the provided subscription id
/// Creates a new subscription stream for the provided subscription id.
///
/// ### Note
/// Most providers treat `SubscriptionStream` IDs as global singletons.
/// Instanitating this directly with a known ID will likely cause any
/// existing streams with that ID to end. To avoid this, start a new stream
/// using [`Provider::subscribe`] instead of `SubscriptionStream::new`.
pub fn new(id: U256, provider: &'a Provider<P>) -> Result<Self, P::Error> {
// Call the underlying PubsubClient's subscribe
let rx = provider.as_ref().subscribe(id)?;
@ -56,7 +62,7 @@ where
})
}
/// Unsubscribes from the subscription
/// Unsubscribes from the subscription.
pub async fn unsubscribe(&self) -> Result<bool, crate::ProviderError> {
self.provider.unsubscribe(self.id).await
}
@ -79,7 +85,7 @@ where
Ok(res) => Poll::Ready(Some(res)),
_ => Poll::Pending,
},
None => Poll::Pending,
None => Poll::Ready(None),
}
}
}

View File

@ -23,7 +23,10 @@ use std::{
use thiserror::Error;
use tokio_tungstenite::{
connect_async,
tungstenite::{self, protocol::Message},
tungstenite::{
self,
protocol::{CloseFrame, Message},
},
};
use tracing::{error, warn};
@ -91,6 +94,11 @@ impl Ws {
}
}
/// Returns true if the WS connection is active, false otherwise
pub fn ready(&self) -> bool {
!self.requests.is_closed()
}
/// Initializes a new WebSocket Client
pub async fn connect(
url: impl tungstenite::client::IntoClientRequest + Unpin,
@ -189,7 +197,16 @@ where
{
let f = async move {
loop {
self.process().await.expect("WS Server panic");
match self.process().await {
Err(ClientError::UnexpectedClose) => {
tracing::error!("{}", ClientError::UnexpectedClose);
break;
}
Err(_) => {
panic!("WS Server panic");
}
_ => {}
}
}
};
@ -201,19 +218,18 @@ where
async fn process(&mut self) -> Result<(), ClientError> {
futures_util::select! {
// Handle requests
msg = self.requests.next() => match msg {
Some(msg) => self.handle_request(msg).await?,
None => {},
msg = self.requests.select_next_some() => {
self.handle_request(msg).await?;
},
// Handle ws messages
msg = self.ws.next() => match msg {
Some(Ok(msg)) => self.handle_ws(msg).await?,
// TODO: Log the error?
Some(Err(_)) => {},
None => {},
},
// finished
complete => {},
None => {
return Err(ClientError::UnexpectedClose);
},
}
};
Ok(())
@ -258,7 +274,9 @@ where
Message::Text(inner) => self.handle_text(inner).await,
Message::Ping(inner) => self.handle_ping(inner).await,
Message::Pong(_) => Ok(()), // Server is allowed to send unsolicited pongs.
_ => Err(ClientError::NoResponse),
Message::Close(Some(frame)) => Err(ClientError::WsClosed(frame)),
Message::Close(None) => Err(ClientError::UnexpectedClose),
Message::Binary(buf) => Err(ClientError::UnexpectedBinary(buf)),
}
}
@ -304,9 +322,9 @@ pub enum ClientError {
/// Thrown if the response could not be parsed
JsonRpcError(#[from] JsonRpcError),
/// Thrown if the websocket didn't respond to our message
#[error("Websocket connection did not respond with text data")]
NoResponse,
/// Thrown if the websocket responds with binary data
#[error("Websocket responded with unexpected binary data")]
UnexpectedBinary(Vec<u8>),
/// Thrown if there's an error over the WS connection
#[error(transparent)]
@ -317,6 +335,14 @@ pub enum ClientError {
#[error(transparent)]
Canceled(#[from] oneshot::Canceled),
/// Remote server sent a Close message
#[error("Websocket closed with info: {0:?}")]
WsClosed(CloseFrame<'static>),
/// Something caused the websocket to close
#[error("WebSocket connection closed unexpectedly")]
UnexpectedClose,
}
impl From<ClientError> for ProviderError {