fix(ws): propagate deserialization errors upstream + infura quickfix (#827)

* fix(ws): propagate error when deserialization fails

* fix(ws): tolerate case-insensitive match on the jsonrpc field

Infura streams notifications with jsonrpc all capitalized. This fails to
deserialize on our end because serde expect a precise match ressembling
the Notification jsonrpc field.

Fixes #684.

* chore(fmt): use nightly rustfmt

* chore: fmt

* chore: fix failing celo test

Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
This commit is contained in:
Erwan 2022-01-24 14:38:00 +01:00 committed by GitHub
parent af94479d81
commit f9f77e829b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 15 additions and 4 deletions

View File

@ -39,6 +39,7 @@ pub struct Request<'a, T> {
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
/// A JSON-RPC Notifcation /// A JSON-RPC Notifcation
pub struct Notification<R> { pub struct Notification<R> {
#[serde(alias = "JSONRPC")]
jsonrpc: String, jsonrpc: String,
method: String, method: String,
pub params: Subscription<R>, pub params: Subscription<R>,

View File

@ -112,7 +112,6 @@ impl Ws {
S: Send + Sync + Stream<Item = WsStreamItem> + Sink<Message, Error = WsError> + Unpin, S: Send + Sync + Stream<Item = WsStreamItem> + Sink<Message, Error = WsError> + Unpin,
{ {
let (sink, stream) = mpsc::unbounded(); let (sink, stream) = mpsc::unbounded();
// Spawn the server // Spawn the server
WsServer::new(ws, stream).spawn(); WsServer::new(ws, stream).spawn();
@ -310,7 +309,8 @@ where
async fn handle_text(&mut self, inner: String) -> Result<(), ClientError> { async fn handle_text(&mut self, inner: String) -> Result<(), ClientError> {
match serde_json::from_str::<Incoming>(&inner) { match serde_json::from_str::<Incoming>(&inner) {
Err(_) => {} Err(err) => return Err(ClientError::JsonError(err)),
Ok(Incoming::Response(resp)) => { Ok(Incoming::Response(resp)) => {
if let Some(request) = self.pending.remove(&resp.id) { if let Some(request) = self.pending.remove(&resp.id) {
if !request.is_canceled() { if !request.is_canceled() {
@ -491,4 +491,14 @@ mod tests {
assert_eq!(sub_id, 1.into()); assert_eq!(sub_id, 1.into());
assert_eq!(blocks, vec![1, 2, 3]) assert_eq!(blocks, vec![1, 2, 3])
} }
#[tokio::test]
async fn deserialization_fails() {
let ganache = Ganache::new().block_time(1u64).spawn();
let (ws, _) = tokio_tungstenite::connect_async(ganache.ws_endpoint()).await.unwrap();
let malformed_data = String::from("not a valid message");
let (_, stream) = mpsc::unbounded();
let resp = WsServer::new(ws, stream).handle_text(malformed_data).await;
assert!(resp.is_err(), "Deserialization should not fail silently");
}
} }

View File

@ -117,14 +117,14 @@ mod celo_tests {
let provider = let provider =
Provider::<Http>::try_from("https://alfajores-forno.celo-testnet.org").unwrap(); Provider::<Http>::try_from("https://alfajores-forno.celo-testnet.org").unwrap();
let tx_hash = "d3d27aa4517124d9ff3ac6f1d8f248e0fe47b6f841b625722546162672ac24c7" let tx_hash = "a8e1d4b9e245a67fafc7c516ff844c2615cc6419d53560e7f358b124e4ce5e1d"
.parse::<H256>() .parse::<H256>()
.unwrap(); .unwrap();
let tx = provider.get_transaction(tx_hash).await.unwrap().unwrap(); let tx = provider.get_transaction(tx_hash).await.unwrap().unwrap();
assert!(tx.gateway_fee_recipient.is_none()); assert!(tx.gateway_fee_recipient.is_none());
assert_eq!(tx.gateway_fee.unwrap(), 0.into()); assert_eq!(tx.gateway_fee.unwrap(), 0.into());
assert_eq!(tx.hash, tx_hash); assert_eq!(tx.hash, tx_hash);
assert_eq!(tx.block_number.unwrap(), 9401421.into()) assert_eq!(tx.block_number.unwrap(), 9534852.into())
} }
#[tokio::test] #[tokio::test]