diff --git a/ethers-providers/Cargo.toml b/ethers-providers/Cargo.toml index 74299cd6..70eba29f 100644 --- a/ethers-providers/Cargo.toml +++ b/ethers-providers/Cargo.toml @@ -20,7 +20,7 @@ async-trait = { version = "0.1.50", default-features = false } hex = { version = "0.4.3", default-features = false, features = ["std"] } reqwest = { version = "0.11.10", default-features = false, features = ["json"] } serde = { version = "1.0.124", default-features = false, features = ["derive"] } -serde_json = { version = "1.0.64", default-features = false } +serde_json = { version = "1.0.64", default-features = false, features = ["raw_value"] } thiserror = { version = "1.0.30", default-features = false } url = { version = "2.2.2", default-features = false } auto_impl = { version = "0.5.0", default-features = false } diff --git a/ethers-providers/src/pubsub.rs b/ethers-providers/src/pubsub.rs index 831cd956..9aed8252 100644 --- a/ethers-providers/src/pubsub.rs +++ b/ethers-providers/src/pubsub.rs @@ -5,7 +5,7 @@ use ethers_core::types::{TxHash, U256}; use futures_util::stream::Stream; use pin_project::{pin_project, pinned_drop}; use serde::de::DeserializeOwned; -use serde_json::Value; +use serde_json::value::RawValue; use std::{ marker::PhantomData, pin::Pin, @@ -15,7 +15,7 @@ use std::{ /// A transport implementation supporting pub sub subscriptions. pub trait PubsubClient: JsonRpcClient { /// The type of stream this transport returns - type NotificationStream: futures_core::Stream + Send + Unpin; + type NotificationStream: futures_core::Stream> + Send + Unpin; /// Add a subscription to this transport fn subscribe>(&self, id: T) -> Result; @@ -76,7 +76,7 @@ where fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll> { let this = self.project(); match futures_util::ready!(this.rx.poll_next(ctx)) { - Some(item) => match serde_json::from_value(item) { + Some(item) => match serde_json::from_str(item.get()) { Ok(res) => Poll::Ready(Some(res)), _ => Poll::Pending, }, diff --git a/ethers-providers/src/transports/common.rs b/ethers-providers/src/transports/common.rs index 6162bde6..840d4020 100644 --- a/ethers-providers/src/transports/common.rs +++ b/ethers-providers/src/transports/common.rs @@ -1,11 +1,16 @@ // Code adapted from: https://github.com/althea-net/guac_rs/tree/master/web3/src/jsonrpc -use ethers_core::types::U256; -use serde::{Deserialize, Serialize}; -use serde_json::Value; use std::fmt; + +use serde::{ + de::{self, MapAccess, Unexpected, Visitor}, + Deserialize, Serialize, +}; +use serde_json::{value::RawValue, Value}; use thiserror::Error; -#[derive(Serialize, Deserialize, Debug, Clone, Error)] +use ethers_core::types::U256; + +#[derive(Deserialize, Debug, Clone, Error)] /// A JSON-RPC 2.0 error pub struct JsonRpcError { /// The error code @@ -36,21 +41,6 @@ pub struct Request<'a, T> { params: T, } -#[derive(Serialize, Deserialize, Debug)] -/// A JSON-RPC Notifcation -pub struct Notification { - #[serde(alias = "JSONRPC")] - jsonrpc: String, - method: String, - pub params: Subscription, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct Subscription { - pub subscription: U256, - pub result: R, -} - impl<'a, T> Request<'a, T> { /// Creates a new JSON RPC request pub fn new(id: u64, method: &'a str, params: T) -> Self { @@ -58,39 +48,134 @@ impl<'a, T> Request<'a, T> { } } -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct Response { - pub(crate) id: u64, - jsonrpc: String, - #[serde(flatten)] - pub data: ResponseData, +/// A JSON-RPC Notifcation +#[allow(unused)] +#[derive(Deserialize, Debug)] +pub struct Notification<'a> { + #[serde(alias = "JSONRPC")] + jsonrpc: &'a str, + method: &'a str, + #[serde(borrow)] + pub params: Subscription<'a>, } -#[derive(Serialize, Deserialize, Debug, Clone)] -#[serde(untagged)] -pub enum ResponseData { - Error { error: JsonRpcError }, - Success { result: R }, +#[derive(Deserialize, Debug)] +pub struct Subscription<'a> { + pub subscription: U256, + #[serde(borrow)] + pub result: &'a RawValue, } -impl ResponseData { - /// Consume response and return value - pub fn into_result(self) -> Result { +#[derive(Debug)] +pub enum Response<'a> { + Success { id: u64, jsonrpc: &'a str, result: &'a RawValue }, + Error { id: u64, jsonrpc: &'a str, error: JsonRpcError }, +} + +impl Response<'_> { + pub fn id(&self) -> u64 { match self { - ResponseData::Success { result } => Ok(result), - ResponseData::Error { error } => Err(error), + Self::Success { id, .. } => *id, + Self::Error { id, .. } => *id, + } + } + + pub fn as_result(&self) -> Result<&RawValue, &JsonRpcError> { + match self { + Self::Success { result, .. } => Ok(*result), + Self::Error { error, .. } => Err(error), + } + } + + pub fn into_result(self) -> Result, JsonRpcError> { + match self { + Self::Success { result, .. } => Ok(result.to_owned()), + Self::Error { error, .. } => Err(error), } } } -impl ResponseData { - /// Encode the error to json value if it is an error - #[allow(dead_code)] - pub fn into_value(self) -> serde_json::Result { - match self { - ResponseData::Success { result } => Ok(result), - ResponseData::Error { error } => serde_json::to_value(error), +// FIXME: ideally, this could be auto-derived as an untagged enum, but due to +// https://github.com/serde-rs/serde/issues/1183 this currently fails +impl<'de: 'a, 'a> Deserialize<'de> for Response<'a> { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + struct ResponseVisitor<'a>(&'a ()); + impl<'de: 'a, 'a> Visitor<'de> for ResponseVisitor<'a> { + type Value = Response<'a>; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("a valid jsonrpc 2.0 response object") + } + + fn visit_map(self, mut map: A) -> Result + where + A: MapAccess<'de>, + { + let mut id = None; + let mut jsonrpc = None; + let mut result = None; + let mut error = None; + + while let Some(key) = map.next_key()? { + match key { + "id" => { + let value: u64 = map.next_value()?; + let prev = id.replace(value); + if prev.is_some() { + return Err(de::Error::duplicate_field("id")) + } + } + "jsonrpc" => { + let value: &'de str = map.next_value()?; + if value != "2.0" { + return Err(de::Error::invalid_value(Unexpected::Str(value), &"2.0")) + } + + let prev = jsonrpc.replace(value); + if prev.is_some() { + return Err(de::Error::duplicate_field("jsonrpc")) + } + } + "result" => { + let value: &RawValue = map.next_value()?; + let prev = result.replace(value); + if prev.is_some() { + return Err(de::Error::duplicate_field("result")) + } + } + "error" => { + let value: JsonRpcError = map.next_value()?; + let prev = error.replace(value); + if prev.is_some() { + return Err(de::Error::duplicate_field("error")) + } + } + key => { + return Err(de::Error::unknown_field( + key, + &["id", "jsonrpc", "result", "error"], + )) + } + } + } + + let id = id.ok_or_else(|| de::Error::missing_field("id"))?; + let jsonrpc = jsonrpc.ok_or_else(|| de::Error::missing_field("jsonrpc"))?; + + match (result, error) { + (Some(result), None) => Ok(Response::Success { id, jsonrpc, result }), + (None, Some(error)) => Ok(Response::Error { id, jsonrpc, error }), + _ => Err(de::Error::custom( + "response must have either a `result` or `error` field", + )), + } + } } + + deserializer.deserialize_map(ResponseVisitor(&())) } } @@ -129,10 +214,27 @@ mod tests { #[test] fn deser_response() { - let response: Response = - serde_json::from_str(r#"{"jsonrpc": "2.0", "result": 19, "id": 1}"#).unwrap(); - assert_eq!(response.id, 1); - assert_eq!(response.data.into_result().unwrap(), 19); + let _ = + serde_json::from_str::>(r#"{"jsonrpc":"2.0","result":19}"#).unwrap_err(); + let _ = serde_json::from_str::>(r#"{"jsonrpc":"3.0","result":19,"id":1}"#) + .unwrap_err(); + + let response: Response<'_> = + serde_json::from_str(r#"{"jsonrpc":"2.0","result":19,"id":1}"#).unwrap(); + + assert_eq!(response.id(), 1); + let result: u64 = serde_json::from_str(response.into_result().unwrap().get()).unwrap(); + assert_eq!(result, 19); + + let response: Response<'_> = serde_json::from_str( + r#"{"jsonrpc":"2.0","error":{"code":-32000,"message":"error occurred"},"id":2}"#, + ) + .unwrap(); + + assert_eq!(response.id(), 2); + let err = response.into_result().unwrap_err(); + assert_eq!(err.code, -32000); + assert_eq!(err.message, "error occurred"); } #[test] diff --git a/ethers-providers/src/transports/http.rs b/ethers-providers/src/transports/http.rs index 87fc57f9..ca77ddf5 100644 --- a/ethers-providers/src/transports/http.rs +++ b/ethers-providers/src/transports/http.rs @@ -73,10 +73,16 @@ impl JsonRpcClient for Provider { let res = self.client.post(self.url.as_ref()).json(&payload).send().await?; let text = res.text().await?; - let res: Response = - serde_json::from_str(&text).map_err(|err| ClientError::SerdeJson { err, text })?; + let response: Response<'_> = match serde_json::from_str(&text) { + Ok(response) => response, + Err(err) => return Err(ClientError::SerdeJson { err, text }), + }; - Ok(res.data.into_result()?) + let raw = response.as_result().map_err(Clone::clone)?; + let res = serde_json::from_str(raw.get()) + .map_err(|err| ClientError::SerdeJson { err, text: raw.to_string() })?; + + Ok(res) } } diff --git a/ethers-providers/src/transports/ipc.rs b/ethers-providers/src/transports/ipc.rs index baf5f155..6b065083 100644 --- a/ethers-providers/src/transports/ipc.rs +++ b/ethers-providers/src/transports/ipc.rs @@ -1,6 +1,6 @@ use crate::{ provider::ProviderError, - transports::common::{JsonRpcError, Notification, Request, Response}, + transports::common::{JsonRpcError, Request, Response}, JsonRpcClient, PubsubClient, }; use ethers_core::types::U256; @@ -10,6 +10,7 @@ use futures_channel::mpsc; use futures_util::stream::{Fuse, StreamExt}; use oneshot::error::RecvError; use serde::{de::DeserializeOwned, Serialize}; +use serde_json::{value::RawValue, Deserializer, StreamDeserializer}; use std::{ collections::HashMap, path::Path, @@ -27,6 +28,8 @@ use tokio::{ use tokio_util::io::ReaderStream; use tracing::{error, warn}; +use super::common::Notification; + /// Unix Domain Sockets (IPC) transport. #[derive(Debug, Clone)] pub struct Ipc { @@ -34,8 +37,8 @@ pub struct Ipc { messages_tx: mpsc::UnboundedSender, } -type Pending = oneshot::Sender; -type Subscription = mpsc::UnboundedSender; +type Pending = oneshot::Sender, JsonRpcError>>; +type Subscription = mpsc::UnboundedSender>; #[derive(Debug)] enum TransportMessage { @@ -93,15 +96,15 @@ impl JsonRpcClient for Ipc { self.send(payload)?; // Wait for the response from the IPC server. - let res = receiver.await?; + let res = receiver.await??; // Parse JSON response. - Ok(serde_json::from_value(res)?) + Ok(serde_json::from_str(res.get())?) } } impl PubsubClient for Ipc { - type NotificationStream = mpsc::UnboundedReceiver; + type NotificationStream = mpsc::UnboundedReceiver>; fn subscribe>(&self, id: T) -> Result { let (sink, stream) = mpsc::unbounded(); @@ -147,7 +150,7 @@ where let f = async move { let mut read_buffer = Vec::new(); loop { - let closed = self.process(&mut read_buffer).await.expect("WS Server panic"); + let closed = self.process(&mut read_buffer).await.expect("IPC Server panic"); if closed && self.pending.is_empty() { break } @@ -216,35 +219,30 @@ where ) -> Result<(), IpcError> { // Extend buffer of previously unread with the new read bytes read_buffer.extend_from_slice(&bytes); - - let read_len = { - // Deserialize as many full elements from the stream as exists - let mut de: serde_json::StreamDeserializer<_, serde_json::Value> = - serde_json::Deserializer::from_slice(read_buffer).into_iter(); - - // Iterate through these elements, and handle responses/notifications - while let Some(Ok(value)) = de.next() { - if let Ok(notification) = - serde_json::from_value::>(value.clone()) - { - // Send notify response if okay. - if let Err(e) = self.notify(notification) { - error!("Failed to send IPC notification: {}", e) - } - } else if let Ok(response) = - serde_json::from_value::>(value) - { - if let Err(e) = self.respond(response) { - error!("Failed to send IPC response: {}", e) - } - } else { - warn!("JSON from IPC stream is not a response or notification"); + // Deserialize as many full elements from the stream as exists + let mut de: StreamDeserializer<_, &RawValue> = + Deserializer::from_slice(read_buffer).into_iter(); + // Iterate through these elements, and handle responses/notifications + while let Some(Ok(raw)) = de.next() { + if let Ok(response) = serde_json::from_str(raw.get()) { + // Send notify response if okay. + if let Err(e) = self.respond(response) { + error!(err = %e, "Failed to send IPC response"); } } - // Get the offset of bytes to handle partial buffer reads - de.byte_offset() - }; + if let Ok(notification) = serde_json::from_str(raw.get()) { + // Send notify response if okay. + if let Err(e) = self.notify(notification) { + error!(err = %e, "Failed to send IPC notification"); + } + } + + warn!("JSON from IPC stream is not a response or notification"); + } + + // Get the offset of bytes to handle partial buffer reads + let read_len = de.byte_offset(); // Reset buffer to just include the partial value bytes. read_buffer.copy_within(read_len.., 0); @@ -255,10 +253,10 @@ where /// Sends notification through the channel based on the ID of the subscription. /// This handles streaming responses. - fn notify(&mut self, notification: Notification) -> Result<(), IpcError> { + fn notify(&mut self, notification: Notification<'_>) -> Result<(), IpcError> { let id = notification.params.subscription; if let Some(tx) = self.subscriptions.get(&id) { - tx.unbounded_send(notification.params.result).map_err(|_| { + tx.unbounded_send(notification.params.result.to_owned()).map_err(|_| { IpcError::ChannelError(format!("Subscription receiver {} dropped", id)) })?; } @@ -269,17 +267,15 @@ where /// Sends JSON response through the channel based on the ID in that response. /// This handles RPC calls with only one response, and the channel entry is dropped after /// sending. - fn respond(&mut self, output: Response) -> Result<(), IpcError> { - let id = output.id; - - // Converts output into result, to send data if valid response. - let value = output.data.into_value()?; + fn respond(&mut self, response: Response<'_>) -> Result<(), IpcError> { + let id = response.id(); + let res = response.into_result(); let response_tx = self.pending.remove(&id).ok_or_else(|| { IpcError::ChannelError("No response channel exists for the response ID".to_string()) })?; - response_tx.send(value).map_err(|_| { + response_tx.send(res).map_err(|_| { IpcError::ChannelError("Receiver channel for response has been dropped".to_string()) })?; @@ -353,7 +349,7 @@ mod test { let mut blocks = Vec::new(); for _ in 0..3 { let item = stream.next().await.unwrap(); - let block = serde_json::from_value::>(item).unwrap(); + let block: Block = serde_json::from_str(item.get()).unwrap(); blocks.push(block.number.unwrap_or_default().as_u64()); } let offset = blocks[0] - block_num; diff --git a/ethers-providers/src/transports/quorum.rs b/ethers-providers/src/transports/quorum.rs index e8dbbbfe..7391510c 100644 --- a/ethers-providers/src/transports/quorum.rs +++ b/ethers-providers/src/transports/quorum.rs @@ -12,7 +12,7 @@ use ethers_core::types::{U256, U64}; use futures_core::Stream; use futures_util::{future::join_all, FutureExt, StreamExt}; use serde::{de::DeserializeOwned, Serialize}; -use serde_json::Value; +use serde_json::{value::RawValue, Value}; use thiserror::Error; /// A provider that bundles multiple providers and only returns a value to the @@ -338,7 +338,8 @@ impl From for ProviderError { pub trait JsonRpcClientWrapper: Send + Sync + fmt::Debug { async fn request(&self, method: &str, params: Value) -> Result; } -type NotificationStream = Box + Send + Unpin + 'static>; +type NotificationStream = + Box> + Send + Unpin + 'static>; pub trait PubsubClientWrapper: JsonRpcClientWrapper { /// Add a subscription to this transport @@ -428,7 +429,7 @@ where // A stream that returns a value and the weight of its provider type WeightedNotificationStream = - Pin + Send + Unpin + 'static>>; + Pin, u64)> + Send + Unpin + 'static>>; /// A Subscription stream that only yields the next value if the underlying /// providers reached quorum. @@ -436,7 +437,7 @@ pub struct QuorumStream { // Weight required to reach quorum quorum_weight: u64, /// The different notifications with their cumulative weight - responses: Vec<(Value, u64)>, + responses: Vec<(Box, u64)>, /// All provider notification streams active: Vec, /// Provider streams that already yielded a new value and are waiting for @@ -451,7 +452,7 @@ impl QuorumStream { } impl Stream for QuorumStream { - type Item = Value; + type Item = Box; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); @@ -465,7 +466,9 @@ impl Stream for QuorumStream { match stream.poll_next_unpin(cx) { Poll::Ready(Some((val, response_weight))) => { - if let Some((_, weight)) = this.responses.iter_mut().find(|(v, _)| &val == v) { + if let Some((_, weight)) = + this.responses.iter_mut().find(|(v, _)| val.get() == v.get()) + { *weight += response_weight; if *weight >= this.quorum_weight { // reached quorum with multiple notification diff --git a/ethers-providers/src/transports/ws.rs b/ethers-providers/src/transports/ws.rs index 31306e54..10c1f143 100644 --- a/ethers-providers/src/transports/ws.rs +++ b/ethers-providers/src/transports/ws.rs @@ -1,6 +1,6 @@ use crate::{ provider::ProviderError, - transports::common::{JsonRpcError, Notification, Request, Response}, + transports::common::{JsonRpcError, Request}, JsonRpcClient, PubsubClient, }; use ethers_core::types::U256; @@ -11,7 +11,11 @@ use futures_util::{ sink::{Sink, SinkExt}, stream::{Fuse, Stream, StreamExt}, }; -use serde::{de::DeserializeOwned, Serialize}; +use serde::{ + de::{DeserializeOwned, Error}, + Serialize, +}; +use serde_json::value::RawValue; use std::{ collections::{btree_map::Entry, BTreeMap}, fmt::{self, Debug}, @@ -22,6 +26,8 @@ use std::{ }; use thiserror::Error; +use super::common::{Notification, Response}; + if_wasm! { use wasm_bindgen::prelude::*; use wasm_bindgen_futures::spawn_local; @@ -65,8 +71,8 @@ if_not_wasm! { use tungstenite::client::IntoClientRequest; } -type Pending = oneshot::Sender>; -type Subscription = mpsc::UnboundedSender; +type Pending = oneshot::Sender, JsonRpcError>>; +type Subscription = mpsc::UnboundedSender>; /// Instructions for the `WsServer`. enum Instruction { @@ -78,13 +84,6 @@ enum Instruction { Unsubscribe { id: U256 }, } -#[derive(Debug, serde::Deserialize)] -#[serde(untagged)] -enum Incoming { - Notification(Notification), - Response(Response), -} - /// A JSON-RPC Client over Websockets. /// /// ```no_run @@ -184,19 +183,16 @@ impl JsonRpcClient for Ws { // send the data self.send(payload)?; - // wait for the response - let res = receiver.await?; - - // in case the request itself has any errors - let res = res?; + // wait for the response (the request itself may have errors as well) + let res = receiver.await??; // parse it - Ok(serde_json::from_value(res)?) + Ok(serde_json::from_str(res.get())?) } } impl PubsubClient for Ws { - type NotificationStream = mpsc::UnboundedReceiver; + type NotificationStream = mpsc::UnboundedReceiver>; fn subscribe>(&self, id: T) -> Result { let (sink, stream) = mpsc::unbounded(); @@ -324,30 +320,35 @@ where } async fn handle_text(&mut self, inner: String) -> Result<(), ClientError> { - match serde_json::from_str::(&inner) { - Err(err) => return Err(ClientError::JsonError(err)), + if let Ok(response) = serde_json::from_str::>(&inner) { + if let Some(request) = self.pending.remove(&response.id()) { + if !request.is_canceled() { + request.send(response.into_result()).map_err(to_client_error)?; + } + } - Ok(Incoming::Response(resp)) => { - if let Some(request) = self.pending.remove(&resp.id) { - if !request.is_canceled() { - request.send(resp.data.into_result()).map_err(to_client_error)?; - } - } - } - Ok(Incoming::Notification(notification)) => { - let id = notification.params.subscription; - if let Entry::Occupied(stream) = self.subscriptions.entry(id) { - if let Err(err) = stream.get().unbounded_send(notification.params.result) { - if err.is_disconnected() { - // subscription channel was closed on the receiver end - stream.remove(); - } - return Err(to_client_error(err)) - } - } - } + return Ok(()) } - Ok(()) + + if let Ok(notification) = serde_json::from_str::>(&inner) { + let id = notification.params.subscription; + if let Entry::Occupied(stream) = self.subscriptions.entry(id) { + if let Err(err) = stream.get().unbounded_send(notification.params.result.to_owned()) + { + if err.is_disconnected() { + // subscription channel was closed on the receiver end + stream.remove(); + } + return Err(to_client_error(err)) + } + } + + return Ok(()) + } + + Err(ClientError::JsonError(serde_json::Error::custom( + "response is neither a valid jsonrpc response nor notification", + ))) } #[cfg(target_arch = "wasm32")] @@ -516,7 +517,7 @@ mod tests { let mut blocks = Vec::new(); for _ in 0..3 { let item = stream.next().await.unwrap(); - let block = serde_json::from_value::>(item).unwrap(); + let block: Block = serde_json::from_str(item.get()).unwrap(); blocks.push(block.number.unwrap_or_default().as_u64()); }