From c4e09f261eca7a9e210d70cf11865e452b613cd7 Mon Sep 17 00:00:00 2001 From: Andrea Simeoni Date: Thu, 1 Dec 2022 20:00:10 +0100 Subject: [PATCH] Fix: handle panic on Ws error (#1915) * On Ws error close all active subscriptions and force clients to reconnect. the Websocket. * Comment typos * Unit tests Lint cargo +nightly fmt * - Added CHANGELOG entry - Added `#` prefix to issue IDs where missing * ownership typo Co-authored-by: Andrea Simeoni <> --- CHANGELOG.md | 31 ++++++----- ethers-providers/src/transports/ws.rs | 23 +++++--- ethers-providers/tests/ws_errors.rs | 79 +++++++++++++++++++++++++++ 3 files changed, 109 insertions(+), 24 deletions(-) create mode 100644 ethers-providers/tests/ws_errors.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index a3c817c3..74252ccc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,9 +3,10 @@ ## ethers-core ### Unreleased +- Graceful handling of WebSocket transport errors [#1889](https://github.com/gakonst/ethers-rs/issues/1889) [#1815](https://github.com/gakonst/ethers-rs/issues/1815) - `MiddlewareBuilder` trait to instantiate a `Provider` as `Middleware` layers. - An `Event` builder can be instantiated specifying the event filter type, without the need to instantiate a contract. -- Add 'ethers_core::types::OpCode' and use in 'ethers_core::types::VMOperation' [1857](https://github.com/gakonst/ethers-rs/issues/1857) +- Add 'ethers_core::types::OpCode' and use in 'ethers_core::types::VMOperation' [#1857](https://github.com/gakonst/ethers-rs/issues/1857) - Remove rust_decimals dependency for ethers-core - Add support for numbers greater than 2^96 for `ethers_core::utils::parse_units` [#1822](https://github.com/gakonst/ethers-rs/issues/1822) - Add comment about safety of u8 -> u64 cast in `ethers_core::types::Signature` @@ -26,38 +27,38 @@ - Add `as_*_mut` methods on `TypedTransaction` [#1310](https://github.com/gakonst/ethers-rs/pull/1310) - AWS EIP712 data signing no longer signs with EIP155 -- Added Cronos testnet to etherscan options [1276](https://github.com/gakonst/ethers-rs/pull/1276) +- Added Cronos testnet to etherscan options [#1276](https://github.com/gakonst/ethers-rs/pull/1276) - Fix parsing of a pending block - [1272](https://github.com/gakonst/ethers-rs/pull/1272) + [#1272](https://github.com/gakonst/ethers-rs/pull/1272) - Removed Cronos mainnet beta from `is_legacy` [1246](https://github.com/gakonst/ethers-rs/pull/1246) - Fix RLP decoding of `from` field for `Eip1559TransactionRequest` and `Eip2930TransactionRequest`, remove `Eip1559TransactionRequest` `sighash` - method [1180](https://github.com/gakonst/ethers-rs/pull/1180) + method [#1180](https://github.com/gakonst/ethers-rs/pull/1180) - Fix RLP encoding of absent access list in `Transaction` [1137](https://github.com/gakonst/ethers-rs/pull/1137) -- Pass compilation time as additional argument to `Reporter::on_solc_success` [1098](https://github.com/gakonst/ethers-rs/pull/1098) +- Pass compilation time as additional argument to `Reporter::on_solc_success` [#1098](https://github.com/gakonst/ethers-rs/pull/1098) - Fix aws signer bug which maps un-normalized signature to error if no normalization occurs (in `aws::utils::decode_signature`) - Implement signed transaction RLP decoding [#1096](https://github.com/gakonst/ethers-rs/pull/1096) - `Transaction::from` will default to `Address::zero()`. Add `recover_from` and `recover_from_mut` methods for recovering the sender from signature, and also - setting the same on tx [1075](https://github.com/gakonst/ethers-rs/pull/1075). -- Add Etherscan account API endpoints [939](https://github.com/gakonst/ethers-rs/pull/939) + setting the same on tx [#1075](https://github.com/gakonst/ethers-rs/pull/1075). +- Add Etherscan account API endpoints [#939](https://github.com/gakonst/ethers-rs/pull/939) - Add FTM Mainet and testnet to parse method "try_from" from Chain.rs and add cronos mainet and testnet to "from_str" -- Add FTM mainnet and testnet Multicall addresses [927](https://github.com/gakonst/ethers-rs/pull/927) +- Add FTM mainnet and testnet Multicall addresses [#927](https://github.com/gakonst/ethers-rs/pull/927) - Add Cronos mainnet beta and testnet to the list of known chains - [926](https://github.com/gakonst/ethers-rs/pull/926) + [#926](https://github.com/gakonst/ethers-rs/pull/926) - `Chain::to_string` will return the same chain name as `Chain::from_str` -- Add `eth_syncing` [848](https://github.com/gakonst/ethers-rs/pull/848) +- Add `eth_syncing` [#848](https://github.com/gakonst/ethers-rs/pull/848) - Fix overflow and possible divide-by-zero in `estimate_priority_fee` - Add BSC mainnet and testnet to the list of known chains - [831](https://github.com/gakonst/ethers-rs/pull/831) + [#831](https://github.com/gakonst/ethers-rs/pull/831) - Returns error on invalid type conversion instead of panicking - [691](https://github.com/gakonst/ethers-rs/pull/691/files) + [#691](https://github.com/gakonst/ethers-rs/pull/691/files) - Change types mapping for solidity `bytes` to rust `ethers::core::Bytes` and solidity `uint8[]` to rust `Vec`. - [613](https://github.com/gakonst/ethers-rs/pull/613) + [#613](https://github.com/gakonst/ethers-rs/pull/613) - Fix `format_units` to return a `String` of representing a decimal point float such that the decimal places don't get truncated. - [597](https://github.com/gakonst/ethers-rs/pull/597) + [#597](https://github.com/gakonst/ethers-rs/pull/597) - Implement hex display format for `ethers::core::Bytes` [#624](https://github.com/gakonst/ethers-rs/pull/624). - Fix `fee_history` to first try with `block_count` encoded as a hex `QUANTITY`. @@ -83,7 +84,7 @@ - Add a getter to `ProjectCompileOutput` that returns a mapping of compiler versions to a vector of name + contract struct tuples [#908](https://github.com/gakonst/ethers-rs/pull/908) -- Add Yul compilation [994](https://github.com/gakonst/ethers-rs/pull/994) +- Add Yul compilation [#994](https://github.com/gakonst/ethers-rs/pull/994) - Enforce commutativity of ENS reverse resolution [#996](https://github.com/gakonst/ethers-rs/pull/996) - Add `TransactionReceipt::to` and `TransactionReceipt::from` diff --git a/ethers-providers/src/transports/ws.rs b/ethers-providers/src/transports/ws.rs index d05970ec..696c51da 100644 --- a/ethers-providers/src/transports/ws.rs +++ b/ethers-providers/src/transports/ws.rs @@ -246,15 +246,11 @@ where debug!("work complete"); break } - match self.tick().await { - Err(ClientError::UnexpectedClose) => { - error!("{}", ClientError::UnexpectedClose); - break - } - Err(e) => { - panic!("WS Server panic: {}", e); - } - _ => {} + + if let Err(e) = self.tick().await { + error!("Received a WebSocket error: {:?}", e); + self.close_all_subscriptions(); + break } } }; @@ -266,6 +262,15 @@ where tokio::spawn(f); } + // This will close all active subscriptions. Each process listening for + // updates will observe the end of their subscription streams. + fn close_all_subscriptions(&self) { + error!("Tearing down subscriptions"); + for (_, sub) in self.subscriptions.iter() { + sub.close_channel(); + } + } + // dispatch an RPC request async fn service_request( &mut self, diff --git a/ethers-providers/tests/ws_errors.rs b/ethers-providers/tests/ws_errors.rs new file mode 100644 index 00000000..812b3dd4 --- /dev/null +++ b/ethers-providers/tests/ws_errors.rs @@ -0,0 +1,79 @@ +#![cfg(not(target_arch = "wasm32"))] +use ethers_providers::{Middleware, Provider, StreamExt}; +use futures_util::SinkExt; +use std::time::Duration; +use tokio::net::{TcpListener, TcpStream}; +use tokio_tungstenite::{ + accept_async, + tungstenite::{ + self, + protocol::{frame::coding::CloseCode, CloseFrame}, + Error, + }, +}; +use tungstenite::protocol::Message; + +const WS_ENDPOINT: &str = "127.0.0.1:9002"; + +#[cfg(not(feature = "celo"))] +mod eth_tests { + use ethers_core::types::Filter; + use ethers_providers::{StreamExt, Ws}; + use tokio_tungstenite::connect_async; + + use super::*; + + #[tokio::test] + async fn graceful_disconnect_on_ws_errors() { + // Spawn a fake Ws server that will drop our connection after a while + spawn_ws_server().await; + + // Connect to the fake server + let (ws, _) = connect_async(format!("ws://{}", WS_ENDPOINT)).await.unwrap(); + let provider = Provider::new(Ws::new(ws)); + let filter = Filter::new().event("Transfer(address,address,uint256)"); + let mut stream = provider.subscribe_logs(&filter).await.unwrap(); + + while let Some(_) = stream.next().await { + assert!(false); // force test to fail + } + + assert!(true); + } +} + +async fn spawn_ws_server() { + let listener = TcpListener::bind(&WS_ENDPOINT).await.expect("Can't listen"); + tokio::spawn(async move { + while let Ok((stream, _)) = listener.accept().await { + tokio::spawn(handle_conn(stream)); + } + }); +} + +async fn handle_conn(stream: TcpStream) -> Result<(), Error> { + let mut ws_stream = accept_async(stream).await?; + + while let Some(_) = ws_stream.next().await { + let res: String = + "{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":\"0xcd0c3e8af590364c09d0fa6a1210faf5\"}" + .into(); + + // Answer with a valid RPC response to keep the connection alive + ws_stream.send(Message::Text(res)).await?; + + // Wait for a while + let timeout = Duration::from_secs(2); + tokio::time::sleep(timeout).await; + + // Drop the connection + ws_stream + .send(Message::Close(Some(CloseFrame { + code: CloseCode::Error, + reason: "Upstream went away".into(), + }))) + .await?; + } + + Ok(()) +}