use ethers_providers::{Middleware, Provider, StreamExt, Ws}; use futures_util::SinkExt; use std::time::Duration; use tokio::net::{TcpListener, TcpStream}; use tokio_tungstenite::{ accept_async, tungstenite::{ self, protocol::{frame::coding::CloseCode, CloseFrame}, Error, }, }; use tungstenite::protocol::Message; const WS_ENDPOINT: &str = "127.0.0.1:9002"; use ethers_core::types::Filter; use tokio_tungstenite::connect_async; #[tokio::test] async fn graceful_disconnect_on_ws_errors() { // Spawn a fake Ws server that will drop our connection after a while spawn_ws_server().await; // Connect to the fake server let (ws, _) = connect_async(format!("ws://{WS_ENDPOINT}")).await.unwrap(); let provider = Provider::new(Ws::new(ws)); let filter = Filter::new().event("Transfer(address,address,uint256)"); let mut stream = provider.subscribe_logs(&filter).await.unwrap(); assert!(stream.next().await.is_none()); } async fn spawn_ws_server() { let listener = TcpListener::bind(&WS_ENDPOINT).await.expect("Can't listen"); tokio::spawn(async move { while let Ok((stream, _)) = listener.accept().await { tokio::spawn(handle_conn(stream)); } }); } async fn handle_conn(stream: TcpStream) -> Result<(), Error> { let mut ws_stream = accept_async(stream).await?; while ws_stream.next().await.is_some() { let res: String = "{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":\"0xcd0c3e8af590364c09d0fa6a1210faf5\"}" .into(); // Answer with a valid RPC response to keep the connection alive ws_stream.send(Message::Text(res)).await?; // Wait for a while let timeout = Duration::from_secs(2); tokio::time::sleep(timeout).await; // Drop the connection ws_stream .send(Message::Close(Some(CloseFrame { code: CloseCode::Error, reason: "Upstream went away".into(), }))) .await?; } Ok(()) }