fix: drop WS server if work complete (#399)
* fix: drop WS server if work complete * fix: remove closed subscriptions * fix: handle error correctly * style: better notification handling
This commit is contained in:
parent
8891ed38b4
commit
bfbbee50cf
|
@ -12,6 +12,7 @@ use futures_util::{
|
||||||
stream::{Fuse, Stream, StreamExt},
|
stream::{Fuse, Stream, StreamExt},
|
||||||
};
|
};
|
||||||
use serde::{de::DeserializeOwned, Serialize};
|
use serde::{de::DeserializeOwned, Serialize};
|
||||||
|
use std::collections::btree_map::Entry;
|
||||||
use std::{
|
use std::{
|
||||||
collections::BTreeMap,
|
collections::BTreeMap,
|
||||||
fmt::{self, Debug},
|
fmt::{self, Debug},
|
||||||
|
@ -198,6 +199,14 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns whether the all work has been completed.
|
||||||
|
///
|
||||||
|
/// If this method returns `true`, then the `instructions` channel has been closed and all
|
||||||
|
/// pending requests and subscriptions have been completed.
|
||||||
|
fn is_done(&self) -> bool {
|
||||||
|
self.instructions.is_done() && self.pending.is_empty() && self.subscriptions.is_empty()
|
||||||
|
}
|
||||||
|
|
||||||
/// Spawns the event loop
|
/// Spawns the event loop
|
||||||
fn spawn(mut self)
|
fn spawn(mut self)
|
||||||
where
|
where
|
||||||
|
@ -205,6 +214,10 @@ where
|
||||||
{
|
{
|
||||||
let f = async move {
|
let f = async move {
|
||||||
loop {
|
loop {
|
||||||
|
if self.is_done() {
|
||||||
|
tracing::info!("work complete");
|
||||||
|
break;
|
||||||
|
}
|
||||||
match self.tick().await {
|
match self.tick().await {
|
||||||
Err(ClientError::UnexpectedClose) => {
|
Err(ClientError::UnexpectedClose) => {
|
||||||
tracing::error!("{}", ClientError::UnexpectedClose);
|
tracing::error!("{}", ClientError::UnexpectedClose);
|
||||||
|
@ -288,10 +301,14 @@ where
|
||||||
}
|
}
|
||||||
Ok(Incoming::Notification(notification)) => {
|
Ok(Incoming::Notification(notification)) => {
|
||||||
let id = notification.params.subscription;
|
let id = notification.params.subscription;
|
||||||
if let Some(stream) = self.subscriptions.get(&id) {
|
if let Entry::Occupied(stream) = self.subscriptions.entry(id) {
|
||||||
stream
|
if let Err(err) = stream.get().unbounded_send(notification.params.result) {
|
||||||
.unbounded_send(notification.params.result)
|
if err.is_disconnected() {
|
||||||
.map_err(to_client_error)?;
|
// subscription channel was closed on the receiver end
|
||||||
|
stream.remove();
|
||||||
|
}
|
||||||
|
return Err(to_client_error(err));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue