diff --git a/ethers-providers/src/transports/ws.rs b/ethers-providers/src/transports/ws.rs index 5d029aa1..5a6c1926 100644 --- a/ethers-providers/src/transports/ws.rs +++ b/ethers-providers/src/transports/ws.rs @@ -12,6 +12,7 @@ use futures_util::{ stream::{Fuse, Stream, StreamExt}, }; use serde::{de::DeserializeOwned, Serialize}; +use std::collections::btree_map::Entry; use std::{ collections::BTreeMap, fmt::{self, Debug}, @@ -198,6 +199,14 @@ where } } + /// Returns whether the all work has been completed. + /// + /// If this method returns `true`, then the `instructions` channel has been closed and all + /// pending requests and subscriptions have been completed. + fn is_done(&self) -> bool { + self.instructions.is_done() && self.pending.is_empty() && self.subscriptions.is_empty() + } + /// Spawns the event loop fn spawn(mut self) where @@ -205,6 +214,10 @@ where { let f = async move { loop { + if self.is_done() { + tracing::info!("work complete"); + break; + } match self.tick().await { Err(ClientError::UnexpectedClose) => { tracing::error!("{}", ClientError::UnexpectedClose); @@ -288,10 +301,14 @@ where } Ok(Incoming::Notification(notification)) => { let id = notification.params.subscription; - if let Some(stream) = self.subscriptions.get(&id) { - stream - .unbounded_send(notification.params.result) - .map_err(to_client_error)?; + if let Entry::Occupied(stream) = self.subscriptions.entry(id) { + if let Err(err) = stream.get().unbounded_send(notification.params.result) { + if err.is_disconnected() { + // subscription channel was closed on the receiver end + stream.remove(); + } + return Err(to_client_error(err)); + } } } }