Fix IPC handling of jsonrpc errors (#1123)
* fixes ipc jsonrpc error handling, use json RawValue where sensible * ran cargo +nightly fmt Co-authored-by: Oliver Giersch <oliver.giersch@b-tu.de>
This commit is contained in:
parent
6e004e7780
commit
db1870c891
|
@ -20,7 +20,7 @@ async-trait = { version = "0.1.50", default-features = false }
|
|||
hex = { version = "0.4.3", default-features = false, features = ["std"] }
|
||||
reqwest = { version = "0.11.10", default-features = false, features = ["json"] }
|
||||
serde = { version = "1.0.124", default-features = false, features = ["derive"] }
|
||||
serde_json = { version = "1.0.64", default-features = false }
|
||||
serde_json = { version = "1.0.64", default-features = false, features = ["raw_value"] }
|
||||
thiserror = { version = "1.0.30", default-features = false }
|
||||
url = { version = "2.2.2", default-features = false }
|
||||
auto_impl = { version = "0.5.0", default-features = false }
|
||||
|
|
|
@ -5,7 +5,7 @@ use ethers_core::types::{TxHash, U256};
|
|||
use futures_util::stream::Stream;
|
||||
use pin_project::{pin_project, pinned_drop};
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde_json::Value;
|
||||
use serde_json::value::RawValue;
|
||||
use std::{
|
||||
marker::PhantomData,
|
||||
pin::Pin,
|
||||
|
@ -15,7 +15,7 @@ use std::{
|
|||
/// A transport implementation supporting pub sub subscriptions.
|
||||
pub trait PubsubClient: JsonRpcClient {
|
||||
/// The type of stream this transport returns
|
||||
type NotificationStream: futures_core::Stream<Item = Value> + Send + Unpin;
|
||||
type NotificationStream: futures_core::Stream<Item = Box<RawValue>> + Send + Unpin;
|
||||
|
||||
/// Add a subscription to this transport
|
||||
fn subscribe<T: Into<U256>>(&self, id: T) -> Result<Self::NotificationStream, Self::Error>;
|
||||
|
@ -76,7 +76,7 @@ where
|
|||
fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
let this = self.project();
|
||||
match futures_util::ready!(this.rx.poll_next(ctx)) {
|
||||
Some(item) => match serde_json::from_value(item) {
|
||||
Some(item) => match serde_json::from_str(item.get()) {
|
||||
Ok(res) => Poll::Ready(Some(res)),
|
||||
_ => Poll::Pending,
|
||||
},
|
||||
|
|
|
@ -1,11 +1,16 @@
|
|||
// Code adapted from: https://github.com/althea-net/guac_rs/tree/master/web3/src/jsonrpc
|
||||
use ethers_core::types::U256;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use std::fmt;
|
||||
|
||||
use serde::{
|
||||
de::{self, MapAccess, Unexpected, Visitor},
|
||||
Deserialize, Serialize,
|
||||
};
|
||||
use serde_json::{value::RawValue, Value};
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Error)]
|
||||
use ethers_core::types::U256;
|
||||
|
||||
#[derive(Deserialize, Debug, Clone, Error)]
|
||||
/// A JSON-RPC 2.0 error
|
||||
pub struct JsonRpcError {
|
||||
/// The error code
|
||||
|
@ -36,21 +41,6 @@ pub struct Request<'a, T> {
|
|||
params: T,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
/// A JSON-RPC Notifcation
|
||||
pub struct Notification<R> {
|
||||
#[serde(alias = "JSONRPC")]
|
||||
jsonrpc: String,
|
||||
method: String,
|
||||
pub params: Subscription<R>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct Subscription<R> {
|
||||
pub subscription: U256,
|
||||
pub result: R,
|
||||
}
|
||||
|
||||
impl<'a, T> Request<'a, T> {
|
||||
/// Creates a new JSON RPC request
|
||||
pub fn new(id: u64, method: &'a str, params: T) -> Self {
|
||||
|
@ -58,39 +48,134 @@ impl<'a, T> Request<'a, T> {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct Response<T> {
|
||||
pub(crate) id: u64,
|
||||
jsonrpc: String,
|
||||
#[serde(flatten)]
|
||||
pub data: ResponseData<T>,
|
||||
/// A JSON-RPC Notifcation
|
||||
#[allow(unused)]
|
||||
#[derive(Deserialize, Debug)]
|
||||
pub struct Notification<'a> {
|
||||
#[serde(alias = "JSONRPC")]
|
||||
jsonrpc: &'a str,
|
||||
method: &'a str,
|
||||
#[serde(borrow)]
|
||||
pub params: Subscription<'a>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
#[serde(untagged)]
|
||||
pub enum ResponseData<R> {
|
||||
Error { error: JsonRpcError },
|
||||
Success { result: R },
|
||||
#[derive(Deserialize, Debug)]
|
||||
pub struct Subscription<'a> {
|
||||
pub subscription: U256,
|
||||
#[serde(borrow)]
|
||||
pub result: &'a RawValue,
|
||||
}
|
||||
|
||||
impl<R> ResponseData<R> {
|
||||
/// Consume response and return value
|
||||
pub fn into_result(self) -> Result<R, JsonRpcError> {
|
||||
#[derive(Debug)]
|
||||
pub enum Response<'a> {
|
||||
Success { id: u64, jsonrpc: &'a str, result: &'a RawValue },
|
||||
Error { id: u64, jsonrpc: &'a str, error: JsonRpcError },
|
||||
}
|
||||
|
||||
impl Response<'_> {
|
||||
pub fn id(&self) -> u64 {
|
||||
match self {
|
||||
ResponseData::Success { result } => Ok(result),
|
||||
ResponseData::Error { error } => Err(error),
|
||||
Self::Success { id, .. } => *id,
|
||||
Self::Error { id, .. } => *id,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn as_result(&self) -> Result<&RawValue, &JsonRpcError> {
|
||||
match self {
|
||||
Self::Success { result, .. } => Ok(*result),
|
||||
Self::Error { error, .. } => Err(error),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn into_result(self) -> Result<Box<RawValue>, JsonRpcError> {
|
||||
match self {
|
||||
Self::Success { result, .. } => Ok(result.to_owned()),
|
||||
Self::Error { error, .. } => Err(error),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ResponseData<serde_json::Value> {
|
||||
/// Encode the error to json value if it is an error
|
||||
#[allow(dead_code)]
|
||||
pub fn into_value(self) -> serde_json::Result<serde_json::Value> {
|
||||
match self {
|
||||
ResponseData::Success { result } => Ok(result),
|
||||
ResponseData::Error { error } => serde_json::to_value(error),
|
||||
// FIXME: ideally, this could be auto-derived as an untagged enum, but due to
|
||||
// https://github.com/serde-rs/serde/issues/1183 this currently fails
|
||||
impl<'de: 'a, 'a> Deserialize<'de> for Response<'a> {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
struct ResponseVisitor<'a>(&'a ());
|
||||
impl<'de: 'a, 'a> Visitor<'de> for ResponseVisitor<'a> {
|
||||
type Value = Response<'a>;
|
||||
|
||||
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
|
||||
formatter.write_str("a valid jsonrpc 2.0 response object")
|
||||
}
|
||||
|
||||
fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
|
||||
where
|
||||
A: MapAccess<'de>,
|
||||
{
|
||||
let mut id = None;
|
||||
let mut jsonrpc = None;
|
||||
let mut result = None;
|
||||
let mut error = None;
|
||||
|
||||
while let Some(key) = map.next_key()? {
|
||||
match key {
|
||||
"id" => {
|
||||
let value: u64 = map.next_value()?;
|
||||
let prev = id.replace(value);
|
||||
if prev.is_some() {
|
||||
return Err(de::Error::duplicate_field("id"))
|
||||
}
|
||||
}
|
||||
"jsonrpc" => {
|
||||
let value: &'de str = map.next_value()?;
|
||||
if value != "2.0" {
|
||||
return Err(de::Error::invalid_value(Unexpected::Str(value), &"2.0"))
|
||||
}
|
||||
|
||||
let prev = jsonrpc.replace(value);
|
||||
if prev.is_some() {
|
||||
return Err(de::Error::duplicate_field("jsonrpc"))
|
||||
}
|
||||
}
|
||||
"result" => {
|
||||
let value: &RawValue = map.next_value()?;
|
||||
let prev = result.replace(value);
|
||||
if prev.is_some() {
|
||||
return Err(de::Error::duplicate_field("result"))
|
||||
}
|
||||
}
|
||||
"error" => {
|
||||
let value: JsonRpcError = map.next_value()?;
|
||||
let prev = error.replace(value);
|
||||
if prev.is_some() {
|
||||
return Err(de::Error::duplicate_field("error"))
|
||||
}
|
||||
}
|
||||
key => {
|
||||
return Err(de::Error::unknown_field(
|
||||
key,
|
||||
&["id", "jsonrpc", "result", "error"],
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let id = id.ok_or_else(|| de::Error::missing_field("id"))?;
|
||||
let jsonrpc = jsonrpc.ok_or_else(|| de::Error::missing_field("jsonrpc"))?;
|
||||
|
||||
match (result, error) {
|
||||
(Some(result), None) => Ok(Response::Success { id, jsonrpc, result }),
|
||||
(None, Some(error)) => Ok(Response::Error { id, jsonrpc, error }),
|
||||
_ => Err(de::Error::custom(
|
||||
"response must have either a `result` or `error` field",
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
deserializer.deserialize_map(ResponseVisitor(&()))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -129,10 +214,27 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn deser_response() {
|
||||
let response: Response<u64> =
|
||||
let _ =
|
||||
serde_json::from_str::<Response<'_>>(r#"{"jsonrpc":"2.0","result":19}"#).unwrap_err();
|
||||
let _ = serde_json::from_str::<Response<'_>>(r#"{"jsonrpc":"3.0","result":19,"id":1}"#)
|
||||
.unwrap_err();
|
||||
|
||||
let response: Response<'_> =
|
||||
serde_json::from_str(r#"{"jsonrpc":"2.0","result":19,"id":1}"#).unwrap();
|
||||
assert_eq!(response.id, 1);
|
||||
assert_eq!(response.data.into_result().unwrap(), 19);
|
||||
|
||||
assert_eq!(response.id(), 1);
|
||||
let result: u64 = serde_json::from_str(response.into_result().unwrap().get()).unwrap();
|
||||
assert_eq!(result, 19);
|
||||
|
||||
let response: Response<'_> = serde_json::from_str(
|
||||
r#"{"jsonrpc":"2.0","error":{"code":-32000,"message":"error occurred"},"id":2}"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(response.id(), 2);
|
||||
let err = response.into_result().unwrap_err();
|
||||
assert_eq!(err.code, -32000);
|
||||
assert_eq!(err.message, "error occurred");
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
@ -73,10 +73,16 @@ impl JsonRpcClient for Provider {
|
|||
|
||||
let res = self.client.post(self.url.as_ref()).json(&payload).send().await?;
|
||||
let text = res.text().await?;
|
||||
let res: Response<R> =
|
||||
serde_json::from_str(&text).map_err(|err| ClientError::SerdeJson { err, text })?;
|
||||
let response: Response<'_> = match serde_json::from_str(&text) {
|
||||
Ok(response) => response,
|
||||
Err(err) => return Err(ClientError::SerdeJson { err, text }),
|
||||
};
|
||||
|
||||
Ok(res.data.into_result()?)
|
||||
let raw = response.as_result().map_err(Clone::clone)?;
|
||||
let res = serde_json::from_str(raw.get())
|
||||
.map_err(|err| ClientError::SerdeJson { err, text: raw.to_string() })?;
|
||||
|
||||
Ok(res)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use crate::{
|
||||
provider::ProviderError,
|
||||
transports::common::{JsonRpcError, Notification, Request, Response},
|
||||
transports::common::{JsonRpcError, Request, Response},
|
||||
JsonRpcClient, PubsubClient,
|
||||
};
|
||||
use ethers_core::types::U256;
|
||||
|
@ -10,6 +10,7 @@ use futures_channel::mpsc;
|
|||
use futures_util::stream::{Fuse, StreamExt};
|
||||
use oneshot::error::RecvError;
|
||||
use serde::{de::DeserializeOwned, Serialize};
|
||||
use serde_json::{value::RawValue, Deserializer, StreamDeserializer};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
path::Path,
|
||||
|
@ -27,6 +28,8 @@ use tokio::{
|
|||
use tokio_util::io::ReaderStream;
|
||||
use tracing::{error, warn};
|
||||
|
||||
use super::common::Notification;
|
||||
|
||||
/// Unix Domain Sockets (IPC) transport.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Ipc {
|
||||
|
@ -34,8 +37,8 @@ pub struct Ipc {
|
|||
messages_tx: mpsc::UnboundedSender<TransportMessage>,
|
||||
}
|
||||
|
||||
type Pending = oneshot::Sender<serde_json::Value>;
|
||||
type Subscription = mpsc::UnboundedSender<serde_json::Value>;
|
||||
type Pending = oneshot::Sender<Result<Box<RawValue>, JsonRpcError>>;
|
||||
type Subscription = mpsc::UnboundedSender<Box<RawValue>>;
|
||||
|
||||
#[derive(Debug)]
|
||||
enum TransportMessage {
|
||||
|
@ -93,15 +96,15 @@ impl JsonRpcClient for Ipc {
|
|||
self.send(payload)?;
|
||||
|
||||
// Wait for the response from the IPC server.
|
||||
let res = receiver.await?;
|
||||
let res = receiver.await??;
|
||||
|
||||
// Parse JSON response.
|
||||
Ok(serde_json::from_value(res)?)
|
||||
Ok(serde_json::from_str(res.get())?)
|
||||
}
|
||||
}
|
||||
|
||||
impl PubsubClient for Ipc {
|
||||
type NotificationStream = mpsc::UnboundedReceiver<serde_json::Value>;
|
||||
type NotificationStream = mpsc::UnboundedReceiver<Box<RawValue>>;
|
||||
|
||||
fn subscribe<T: Into<U256>>(&self, id: T) -> Result<Self::NotificationStream, IpcError> {
|
||||
let (sink, stream) = mpsc::unbounded();
|
||||
|
@ -147,7 +150,7 @@ where
|
|||
let f = async move {
|
||||
let mut read_buffer = Vec::new();
|
||||
loop {
|
||||
let closed = self.process(&mut read_buffer).await.expect("WS Server panic");
|
||||
let closed = self.process(&mut read_buffer).await.expect("IPC Server panic");
|
||||
if closed && self.pending.is_empty() {
|
||||
break
|
||||
}
|
||||
|
@ -216,35 +219,30 @@ where
|
|||
) -> Result<(), IpcError> {
|
||||
// Extend buffer of previously unread with the new read bytes
|
||||
read_buffer.extend_from_slice(&bytes);
|
||||
|
||||
let read_len = {
|
||||
// Deserialize as many full elements from the stream as exists
|
||||
let mut de: serde_json::StreamDeserializer<_, serde_json::Value> =
|
||||
serde_json::Deserializer::from_slice(read_buffer).into_iter();
|
||||
|
||||
let mut de: StreamDeserializer<_, &RawValue> =
|
||||
Deserializer::from_slice(read_buffer).into_iter();
|
||||
// Iterate through these elements, and handle responses/notifications
|
||||
while let Some(Ok(value)) = de.next() {
|
||||
if let Ok(notification) =
|
||||
serde_json::from_value::<Notification<serde_json::Value>>(value.clone())
|
||||
{
|
||||
while let Some(Ok(raw)) = de.next() {
|
||||
if let Ok(response) = serde_json::from_str(raw.get()) {
|
||||
// Send notify response if okay.
|
||||
if let Err(e) = self.respond(response) {
|
||||
error!(err = %e, "Failed to send IPC response");
|
||||
}
|
||||
}
|
||||
|
||||
if let Ok(notification) = serde_json::from_str(raw.get()) {
|
||||
// Send notify response if okay.
|
||||
if let Err(e) = self.notify(notification) {
|
||||
error!("Failed to send IPC notification: {}", e)
|
||||
error!(err = %e, "Failed to send IPC notification");
|
||||
}
|
||||
} else if let Ok(response) =
|
||||
serde_json::from_value::<Response<serde_json::Value>>(value)
|
||||
{
|
||||
if let Err(e) = self.respond(response) {
|
||||
error!("Failed to send IPC response: {}", e)
|
||||
}
|
||||
} else {
|
||||
|
||||
warn!("JSON from IPC stream is not a response or notification");
|
||||
}
|
||||
}
|
||||
|
||||
// Get the offset of bytes to handle partial buffer reads
|
||||
de.byte_offset()
|
||||
};
|
||||
let read_len = de.byte_offset();
|
||||
|
||||
// Reset buffer to just include the partial value bytes.
|
||||
read_buffer.copy_within(read_len.., 0);
|
||||
|
@ -255,10 +253,10 @@ where
|
|||
|
||||
/// Sends notification through the channel based on the ID of the subscription.
|
||||
/// This handles streaming responses.
|
||||
fn notify(&mut self, notification: Notification<serde_json::Value>) -> Result<(), IpcError> {
|
||||
fn notify(&mut self, notification: Notification<'_>) -> Result<(), IpcError> {
|
||||
let id = notification.params.subscription;
|
||||
if let Some(tx) = self.subscriptions.get(&id) {
|
||||
tx.unbounded_send(notification.params.result).map_err(|_| {
|
||||
tx.unbounded_send(notification.params.result.to_owned()).map_err(|_| {
|
||||
IpcError::ChannelError(format!("Subscription receiver {} dropped", id))
|
||||
})?;
|
||||
}
|
||||
|
@ -269,17 +267,15 @@ where
|
|||
/// Sends JSON response through the channel based on the ID in that response.
|
||||
/// This handles RPC calls with only one response, and the channel entry is dropped after
|
||||
/// sending.
|
||||
fn respond(&mut self, output: Response<serde_json::Value>) -> Result<(), IpcError> {
|
||||
let id = output.id;
|
||||
|
||||
// Converts output into result, to send data if valid response.
|
||||
let value = output.data.into_value()?;
|
||||
fn respond(&mut self, response: Response<'_>) -> Result<(), IpcError> {
|
||||
let id = response.id();
|
||||
let res = response.into_result();
|
||||
|
||||
let response_tx = self.pending.remove(&id).ok_or_else(|| {
|
||||
IpcError::ChannelError("No response channel exists for the response ID".to_string())
|
||||
})?;
|
||||
|
||||
response_tx.send(value).map_err(|_| {
|
||||
response_tx.send(res).map_err(|_| {
|
||||
IpcError::ChannelError("Receiver channel for response has been dropped".to_string())
|
||||
})?;
|
||||
|
||||
|
@ -353,7 +349,7 @@ mod test {
|
|||
let mut blocks = Vec::new();
|
||||
for _ in 0..3 {
|
||||
let item = stream.next().await.unwrap();
|
||||
let block = serde_json::from_value::<Block<TxHash>>(item).unwrap();
|
||||
let block: Block<TxHash> = serde_json::from_str(item.get()).unwrap();
|
||||
blocks.push(block.number.unwrap_or_default().as_u64());
|
||||
}
|
||||
let offset = blocks[0] - block_num;
|
||||
|
|
|
@ -12,7 +12,7 @@ use ethers_core::types::{U256, U64};
|
|||
use futures_core::Stream;
|
||||
use futures_util::{future::join_all, FutureExt, StreamExt};
|
||||
use serde::{de::DeserializeOwned, Serialize};
|
||||
use serde_json::Value;
|
||||
use serde_json::{value::RawValue, Value};
|
||||
use thiserror::Error;
|
||||
|
||||
/// A provider that bundles multiple providers and only returns a value to the
|
||||
|
@ -338,7 +338,8 @@ impl From<QuorumError> for ProviderError {
|
|||
pub trait JsonRpcClientWrapper: Send + Sync + fmt::Debug {
|
||||
async fn request(&self, method: &str, params: Value) -> Result<Value, ProviderError>;
|
||||
}
|
||||
type NotificationStream = Box<dyn futures_core::Stream<Item = Value> + Send + Unpin + 'static>;
|
||||
type NotificationStream =
|
||||
Box<dyn futures_core::Stream<Item = Box<RawValue>> + Send + Unpin + 'static>;
|
||||
|
||||
pub trait PubsubClientWrapper: JsonRpcClientWrapper {
|
||||
/// Add a subscription to this transport
|
||||
|
@ -428,7 +429,7 @@ where
|
|||
|
||||
// A stream that returns a value and the weight of its provider
|
||||
type WeightedNotificationStream =
|
||||
Pin<Box<dyn futures_core::Stream<Item = (Value, u64)> + Send + Unpin + 'static>>;
|
||||
Pin<Box<dyn futures_core::Stream<Item = (Box<RawValue>, u64)> + Send + Unpin + 'static>>;
|
||||
|
||||
/// A Subscription stream that only yields the next value if the underlying
|
||||
/// providers reached quorum.
|
||||
|
@ -436,7 +437,7 @@ pub struct QuorumStream {
|
|||
// Weight required to reach quorum
|
||||
quorum_weight: u64,
|
||||
/// The different notifications with their cumulative weight
|
||||
responses: Vec<(Value, u64)>,
|
||||
responses: Vec<(Box<RawValue>, u64)>,
|
||||
/// All provider notification streams
|
||||
active: Vec<WeightedNotificationStream>,
|
||||
/// Provider streams that already yielded a new value and are waiting for
|
||||
|
@ -451,7 +452,7 @@ impl QuorumStream {
|
|||
}
|
||||
|
||||
impl Stream for QuorumStream {
|
||||
type Item = Value;
|
||||
type Item = Box<RawValue>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let this = self.get_mut();
|
||||
|
@ -465,7 +466,9 @@ impl Stream for QuorumStream {
|
|||
|
||||
match stream.poll_next_unpin(cx) {
|
||||
Poll::Ready(Some((val, response_weight))) => {
|
||||
if let Some((_, weight)) = this.responses.iter_mut().find(|(v, _)| &val == v) {
|
||||
if let Some((_, weight)) =
|
||||
this.responses.iter_mut().find(|(v, _)| val.get() == v.get())
|
||||
{
|
||||
*weight += response_weight;
|
||||
if *weight >= this.quorum_weight {
|
||||
// reached quorum with multiple notification
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use crate::{
|
||||
provider::ProviderError,
|
||||
transports::common::{JsonRpcError, Notification, Request, Response},
|
||||
transports::common::{JsonRpcError, Request},
|
||||
JsonRpcClient, PubsubClient,
|
||||
};
|
||||
use ethers_core::types::U256;
|
||||
|
@ -11,7 +11,11 @@ use futures_util::{
|
|||
sink::{Sink, SinkExt},
|
||||
stream::{Fuse, Stream, StreamExt},
|
||||
};
|
||||
use serde::{de::DeserializeOwned, Serialize};
|
||||
use serde::{
|
||||
de::{DeserializeOwned, Error},
|
||||
Serialize,
|
||||
};
|
||||
use serde_json::value::RawValue;
|
||||
use std::{
|
||||
collections::{btree_map::Entry, BTreeMap},
|
||||
fmt::{self, Debug},
|
||||
|
@ -22,6 +26,8 @@ use std::{
|
|||
};
|
||||
use thiserror::Error;
|
||||
|
||||
use super::common::{Notification, Response};
|
||||
|
||||
if_wasm! {
|
||||
use wasm_bindgen::prelude::*;
|
||||
use wasm_bindgen_futures::spawn_local;
|
||||
|
@ -65,8 +71,8 @@ if_not_wasm! {
|
|||
use tungstenite::client::IntoClientRequest;
|
||||
}
|
||||
|
||||
type Pending = oneshot::Sender<Result<serde_json::Value, JsonRpcError>>;
|
||||
type Subscription = mpsc::UnboundedSender<serde_json::Value>;
|
||||
type Pending = oneshot::Sender<Result<Box<RawValue>, JsonRpcError>>;
|
||||
type Subscription = mpsc::UnboundedSender<Box<RawValue>>;
|
||||
|
||||
/// Instructions for the `WsServer`.
|
||||
enum Instruction {
|
||||
|
@ -78,13 +84,6 @@ enum Instruction {
|
|||
Unsubscribe { id: U256 },
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize)]
|
||||
#[serde(untagged)]
|
||||
enum Incoming {
|
||||
Notification(Notification<serde_json::Value>),
|
||||
Response(Response<serde_json::Value>),
|
||||
}
|
||||
|
||||
/// A JSON-RPC Client over Websockets.
|
||||
///
|
||||
/// ```no_run
|
||||
|
@ -184,19 +183,16 @@ impl JsonRpcClient for Ws {
|
|||
// send the data
|
||||
self.send(payload)?;
|
||||
|
||||
// wait for the response
|
||||
let res = receiver.await?;
|
||||
|
||||
// in case the request itself has any errors
|
||||
let res = res?;
|
||||
// wait for the response (the request itself may have errors as well)
|
||||
let res = receiver.await??;
|
||||
|
||||
// parse it
|
||||
Ok(serde_json::from_value(res)?)
|
||||
Ok(serde_json::from_str(res.get())?)
|
||||
}
|
||||
}
|
||||
|
||||
impl PubsubClient for Ws {
|
||||
type NotificationStream = mpsc::UnboundedReceiver<serde_json::Value>;
|
||||
type NotificationStream = mpsc::UnboundedReceiver<Box<RawValue>>;
|
||||
|
||||
fn subscribe<T: Into<U256>>(&self, id: T) -> Result<Self::NotificationStream, ClientError> {
|
||||
let (sink, stream) = mpsc::unbounded();
|
||||
|
@ -324,20 +320,21 @@ where
|
|||
}
|
||||
|
||||
async fn handle_text(&mut self, inner: String) -> Result<(), ClientError> {
|
||||
match serde_json::from_str::<Incoming>(&inner) {
|
||||
Err(err) => return Err(ClientError::JsonError(err)),
|
||||
|
||||
Ok(Incoming::Response(resp)) => {
|
||||
if let Some(request) = self.pending.remove(&resp.id) {
|
||||
if let Ok(response) = serde_json::from_str::<Response<'_>>(&inner) {
|
||||
if let Some(request) = self.pending.remove(&response.id()) {
|
||||
if !request.is_canceled() {
|
||||
request.send(resp.data.into_result()).map_err(to_client_error)?;
|
||||
request.send(response.into_result()).map_err(to_client_error)?;
|
||||
}
|
||||
}
|
||||
|
||||
return Ok(())
|
||||
}
|
||||
Ok(Incoming::Notification(notification)) => {
|
||||
|
||||
if let Ok(notification) = serde_json::from_str::<Notification<'_>>(&inner) {
|
||||
let id = notification.params.subscription;
|
||||
if let Entry::Occupied(stream) = self.subscriptions.entry(id) {
|
||||
if let Err(err) = stream.get().unbounded_send(notification.params.result) {
|
||||
if let Err(err) = stream.get().unbounded_send(notification.params.result.to_owned())
|
||||
{
|
||||
if err.is_disconnected() {
|
||||
// subscription channel was closed on the receiver end
|
||||
stream.remove();
|
||||
|
@ -345,9 +342,13 @@ where
|
|||
return Err(to_client_error(err))
|
||||
}
|
||||
}
|
||||
|
||||
return Ok(())
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
||||
Err(ClientError::JsonError(serde_json::Error::custom(
|
||||
"response is neither a valid jsonrpc response nor notification",
|
||||
)))
|
||||
}
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
|
@ -516,7 +517,7 @@ mod tests {
|
|||
let mut blocks = Vec::new();
|
||||
for _ in 0..3 {
|
||||
let item = stream.next().await.unwrap();
|
||||
let block = serde_json::from_value::<Block<TxHash>>(item).unwrap();
|
||||
let block: Block<TxHash> = serde_json::from_str(item.get()).unwrap();
|
||||
blocks.push(block.number.unwrap_or_default().as_u64());
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue