ethers-rs/ethers-providers/tests/it/ws_errors.rs

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

67 lines
2.0 KiB
Rust
Raw Normal View History

ci/test: improve CI jobs and tests (#2189) * ci: move to scripts directory * nits * ci: improve main CI jobs * fix: install script * fix * fix: use curl for windows installation * fix: wasm typo * tests: move to single binary * chore: clippy * chore: clippy * chore: clippy * fix: test command * fix: quote tests * update script * fix: action exclude * fix: dev deps * fix: only run wasm in own job * ci: add aarch64 targets * test: rm useless test * ci: update security audit * ci: add deny CI * chore: rm unused audit.toml * chore: update geth.rs * ci: remove unusable targets * fix: install script path * fix: wasm * improve script * fix: failing ci * fix: contract tests * ci: improve install script * update middleware tests * move integration etherscan tests to tests/ dir * fix: eip2930 access_list field name * add pendingtransaction must_use * add random anvil comment * ci: add miri job * ci: simplify * fixci * Revert "add pendingtransaction must_use" This reverts commit 770b21b4a3c6ef8900a6aa1cd46aa9638317a60d. * fix: macos script * fix: use curl in script * unused ci * update script * fix wasm * rm_miri * fix: signer test * fix: wasm ci * fix: ipc test * fix: live celo tests * fix: abi online source test * fix: windows paths in test * chore: update serial_test * ci: run live tests separately * fix: provider tests * fix: unused var * fix: feature * fix merge * fix: etherscan key tests * ci: rm duplicate audit * fix: split etherscan test ci * fix: etherscan test * fix: generate multiple unused ports * fix: source test * fix: udeps * rm unused
2023-03-01 00:26:27 +00:00
use ethers_core::types::Filter;
use ethers_providers::{Middleware, Provider, StreamExt};
use futures_util::SinkExt;
use std::time::Duration;
use tokio::net::{TcpListener, TcpStream};
use tokio_tungstenite::{
accept_async,
tungstenite::{
self,
protocol::{frame::coding::CloseCode, CloseFrame},
Error,
},
};
use tungstenite::protocol::Message;
const WS_ENDPOINT: &str = "127.0.0.1:9002";
async fn spawn_ws_server() {
let listener = TcpListener::bind(&WS_ENDPOINT).await.expect("Can't listen");
tokio::spawn(async move {
while let Ok((stream, _)) = listener.accept().await {
tokio::spawn(handle_conn(stream));
}
});
}
async fn handle_conn(stream: TcpStream) -> Result<(), Error> {
let mut ws_stream = accept_async(stream).await?;
while ws_stream.next().await.is_some() {
let res: String =
"{\"jsonrpc\":\"2.0\",\"id\":0,\"result\":\"0xcd0c3e8af590364c09d0fa6a1210faf5\"}"
.into();
// Answer with a valid RPC response to keep the connection alive
ws_stream.send(Message::Text(res)).await?;
// Wait for a while
let timeout = Duration::from_secs(2);
tokio::time::sleep(timeout).await;
// Drop the connection
ws_stream
.send(Message::Close(Some(CloseFrame {
code: CloseCode::Error,
reason: "Upstream went away".into(),
})))
.await?;
}
Ok(())
}
#[tokio::test]
async fn graceful_disconnect_on_ws_errors() {
// Spawn a fake Ws server that will drop our connection after a while
spawn_ws_server().await;
// Connect to the fake server
let provider =
Provider::connect_with_reconnects(format!("ws://{WS_ENDPOINT}"), 1).await.unwrap();
let filter = Filter::new().event("Transfer(address,address,uint256)");
let mut stream = provider.subscribe_logs(&filter).await.unwrap();
assert!(stream.next().await.is_none());
}