refactors ipc transport internals (#1174)
* refactors ipc transport internals ran cargo +nightly fmt fixes typo remove some commented out code * remove one unnecessary return stmt Co-authored-by: Oliver Giersch <oliver.giersch@b-tu.de> Co-authored-by: Oliver Giersch <oliver.giersch@gmail.com>
This commit is contained in:
parent
77bd9d49c8
commit
a115e957db
|
@ -1321,6 +1321,7 @@ dependencies = [
|
||||||
"futures-core",
|
"futures-core",
|
||||||
"futures-timer",
|
"futures-timer",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
|
"hashers",
|
||||||
"hex",
|
"hex",
|
||||||
"http",
|
"http",
|
||||||
"once_cell",
|
"once_cell",
|
||||||
|
@ -1634,6 +1635,15 @@ dependencies = [
|
||||||
"slab",
|
"slab",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "fxhash"
|
||||||
|
version = "0.2.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c"
|
||||||
|
dependencies = [
|
||||||
|
"byteorder",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "generic-array"
|
name = "generic-array"
|
||||||
version = "0.12.4"
|
version = "0.12.4"
|
||||||
|
@ -1725,6 +1735,15 @@ version = "0.11.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
|
checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "hashers"
|
||||||
|
version = "1.0.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "b2bca93b15ea5a746f220e56587f71e73c6165eab783df9e26590069953e3c30"
|
||||||
|
dependencies = [
|
||||||
|
"fxhash",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "heck"
|
name = "heck"
|
||||||
version = "0.4.0"
|
version = "0.4.0"
|
||||||
|
|
|
@ -40,6 +40,7 @@ tracing-futures = { version = "0.2.5", default-features = false, features = ["st
|
||||||
|
|
||||||
bytes = { version = "1.1.0", default-features = false, optional = true }
|
bytes = { version = "1.1.0", default-features = false, optional = true }
|
||||||
once_cell = "1.10.0"
|
once_cell = "1.10.0"
|
||||||
|
hashers = "1.0.1"
|
||||||
|
|
||||||
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
|
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
|
||||||
# tokio
|
# tokio
|
||||||
|
|
|
@ -48,54 +48,21 @@ impl<'a, T> Request<'a, T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A JSON-RPC Notifcation
|
/// A JSON-RPC response
|
||||||
#[allow(unused)]
|
#[derive(Debug)]
|
||||||
#[derive(Deserialize, Debug)]
|
pub enum Response<'a> {
|
||||||
pub struct Notification<'a> {
|
Success { id: u64, result: &'a RawValue },
|
||||||
#[serde(alias = "JSONRPC")]
|
Error { id: u64, error: JsonRpcError },
|
||||||
jsonrpc: &'a str,
|
Notification { method: &'a str, params: Params<'a> },
|
||||||
method: &'a str,
|
|
||||||
#[serde(borrow)]
|
|
||||||
pub params: Subscription<'a>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Deserialize, Debug)]
|
#[derive(Deserialize, Debug)]
|
||||||
pub struct Subscription<'a> {
|
pub struct Params<'a> {
|
||||||
pub subscription: U256,
|
pub subscription: U256,
|
||||||
#[serde(borrow)]
|
#[serde(borrow)]
|
||||||
pub result: &'a RawValue,
|
pub result: &'a RawValue,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub enum Response<'a> {
|
|
||||||
Success { id: u64, jsonrpc: &'a str, result: &'a RawValue },
|
|
||||||
Error { id: u64, jsonrpc: &'a str, error: JsonRpcError },
|
|
||||||
}
|
|
||||||
|
|
||||||
#[allow(unused)]
|
|
||||||
impl Response<'_> {
|
|
||||||
pub fn id(&self) -> u64 {
|
|
||||||
match self {
|
|
||||||
Self::Success { id, .. } => *id,
|
|
||||||
Self::Error { id, .. } => *id,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn as_result(&self) -> Result<&RawValue, &JsonRpcError> {
|
|
||||||
match self {
|
|
||||||
Self::Success { result, .. } => Ok(*result),
|
|
||||||
Self::Error { error, .. } => Err(error),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn into_result(self) -> Result<Box<RawValue>, JsonRpcError> {
|
|
||||||
match self {
|
|
||||||
Self::Success { result, .. } => Ok(result.to_owned()),
|
|
||||||
Self::Error { error, .. } => Err(error),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// FIXME: ideally, this could be auto-derived as an untagged enum, but due to
|
// FIXME: ideally, this could be auto-derived as an untagged enum, but due to
|
||||||
// https://github.com/serde-rs/serde/issues/1183 this currently fails
|
// https://github.com/serde-rs/serde/issues/1183 this currently fails
|
||||||
impl<'de: 'a, 'a> Deserialize<'de> for Response<'a> {
|
impl<'de: 'a, 'a> Deserialize<'de> for Response<'a> {
|
||||||
|
@ -115,62 +82,96 @@ impl<'de: 'a, 'a> Deserialize<'de> for Response<'a> {
|
||||||
where
|
where
|
||||||
A: MapAccess<'de>,
|
A: MapAccess<'de>,
|
||||||
{
|
{
|
||||||
|
let mut jsonrpc = false;
|
||||||
|
|
||||||
|
// response & error
|
||||||
let mut id = None;
|
let mut id = None;
|
||||||
let mut jsonrpc = None;
|
// only response
|
||||||
let mut result = None;
|
let mut result = None;
|
||||||
|
// only error
|
||||||
let mut error = None;
|
let mut error = None;
|
||||||
|
// only notification
|
||||||
|
let mut method = None;
|
||||||
|
let mut params = None;
|
||||||
|
|
||||||
while let Some(key) = map.next_key()? {
|
while let Some(key) = map.next_key()? {
|
||||||
match key {
|
match key {
|
||||||
"id" => {
|
|
||||||
let value: u64 = map.next_value()?;
|
|
||||||
let prev = id.replace(value);
|
|
||||||
if prev.is_some() {
|
|
||||||
return Err(de::Error::duplicate_field("id"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
"jsonrpc" => {
|
"jsonrpc" => {
|
||||||
let value: &'de str = map.next_value()?;
|
if jsonrpc {
|
||||||
|
return Err(de::Error::duplicate_field("jsonrpc"))
|
||||||
|
}
|
||||||
|
|
||||||
|
let value = map.next_value()?;
|
||||||
if value != "2.0" {
|
if value != "2.0" {
|
||||||
return Err(de::Error::invalid_value(Unexpected::Str(value), &"2.0"))
|
return Err(de::Error::invalid_value(Unexpected::Str(value), &"2.0"))
|
||||||
}
|
}
|
||||||
|
|
||||||
let prev = jsonrpc.replace(value);
|
jsonrpc = true;
|
||||||
if prev.is_some() {
|
}
|
||||||
return Err(de::Error::duplicate_field("jsonrpc"))
|
"id" => {
|
||||||
|
if id.is_some() {
|
||||||
|
return Err(de::Error::duplicate_field("id"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let value: u64 = map.next_value()?;
|
||||||
|
id = Some(value);
|
||||||
}
|
}
|
||||||
"result" => {
|
"result" => {
|
||||||
let value: &RawValue = map.next_value()?;
|
if result.is_some() {
|
||||||
let prev = result.replace(value);
|
|
||||||
if prev.is_some() {
|
|
||||||
return Err(de::Error::duplicate_field("result"))
|
return Err(de::Error::duplicate_field("result"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let value: &RawValue = map.next_value()?;
|
||||||
|
result = Some(value);
|
||||||
}
|
}
|
||||||
"error" => {
|
"error" => {
|
||||||
let value: JsonRpcError = map.next_value()?;
|
if error.is_some() {
|
||||||
let prev = error.replace(value);
|
|
||||||
if prev.is_some() {
|
|
||||||
return Err(de::Error::duplicate_field("error"))
|
return Err(de::Error::duplicate_field("error"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let value: JsonRpcError = map.next_value()?;
|
||||||
|
error = Some(value);
|
||||||
|
}
|
||||||
|
"method" => {
|
||||||
|
if method.is_some() {
|
||||||
|
return Err(de::Error::duplicate_field("method"))
|
||||||
|
}
|
||||||
|
|
||||||
|
let value: &str = map.next_value()?;
|
||||||
|
method = Some(value);
|
||||||
|
}
|
||||||
|
"params" => {
|
||||||
|
if params.is_some() {
|
||||||
|
return Err(de::Error::duplicate_field("params"))
|
||||||
|
}
|
||||||
|
|
||||||
|
let value: Params = map.next_value()?;
|
||||||
|
params = Some(value);
|
||||||
}
|
}
|
||||||
key => {
|
key => {
|
||||||
return Err(de::Error::unknown_field(
|
return Err(de::Error::unknown_field(
|
||||||
key,
|
key,
|
||||||
&["id", "jsonrpc", "result", "error"],
|
&["id", "jsonrpc", "result", "error", "params", "method"],
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let id = id.ok_or_else(|| de::Error::missing_field("id"))?;
|
// jsonrpc version must be present in all responses
|
||||||
let jsonrpc = jsonrpc.ok_or_else(|| de::Error::missing_field("jsonrpc"))?;
|
if !jsonrpc {
|
||||||
|
return Err(de::Error::missing_field("jsonrpc"))
|
||||||
|
}
|
||||||
|
|
||||||
match (result, error) {
|
match (id, result, error, method, params) {
|
||||||
(Some(result), None) => Ok(Response::Success { id, jsonrpc, result }),
|
(Some(id), Some(result), None, None, None) => {
|
||||||
(None, Some(error)) => Ok(Response::Error { id, jsonrpc, error }),
|
Ok(Response::Success { id, result })
|
||||||
|
}
|
||||||
|
(Some(id), None, Some(error), None, None) => Ok(Response::Error { id, error }),
|
||||||
|
(None, None, None, Some(method), Some(params)) => {
|
||||||
|
Ok(Response::Notification { method, params })
|
||||||
|
}
|
||||||
_ => Err(de::Error::custom(
|
_ => Err(de::Error::custom(
|
||||||
"response must have either a `result` or `error` field",
|
"response must be either a success/error or notification object",
|
||||||
)),
|
)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -223,19 +224,29 @@ mod tests {
|
||||||
let response: Response<'_> =
|
let response: Response<'_> =
|
||||||
serde_json::from_str(r#"{"jsonrpc":"2.0","result":19,"id":1}"#).unwrap();
|
serde_json::from_str(r#"{"jsonrpc":"2.0","result":19,"id":1}"#).unwrap();
|
||||||
|
|
||||||
assert_eq!(response.id(), 1);
|
match response {
|
||||||
let result: u64 = serde_json::from_str(response.into_result().unwrap().get()).unwrap();
|
Response::Success { id, result } => {
|
||||||
assert_eq!(result, 19);
|
assert_eq!(id, 1);
|
||||||
|
let result: u64 = serde_json::from_str(result.get()).unwrap();
|
||||||
|
assert_eq!(result, 19);
|
||||||
|
}
|
||||||
|
_ => panic!("expected `Success` response"),
|
||||||
|
}
|
||||||
|
|
||||||
let response: Response<'_> = serde_json::from_str(
|
let response: Response<'_> = serde_json::from_str(
|
||||||
r#"{"jsonrpc":"2.0","error":{"code":-32000,"message":"error occurred"},"id":2}"#,
|
r#"{"jsonrpc":"2.0","error":{"code":-32000,"message":"error occurred"},"id":2}"#,
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
assert_eq!(response.id(), 2);
|
match response {
|
||||||
let err = response.into_result().unwrap_err();
|
Response::Error { id, error } => {
|
||||||
assert_eq!(err.code, -32000);
|
assert_eq!(id, 2);
|
||||||
assert_eq!(err.message, "error occurred");
|
assert_eq!(error.code, -32000);
|
||||||
|
assert_eq!(error.message, "error occurred");
|
||||||
|
assert!(error.data.is_none());
|
||||||
|
}
|
||||||
|
_ => panic!("expected `Error` response"),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|
|
@ -73,12 +73,20 @@ impl JsonRpcClient for Provider {
|
||||||
|
|
||||||
let res = self.client.post(self.url.as_ref()).json(&payload).send().await?;
|
let res = self.client.post(self.url.as_ref()).json(&payload).send().await?;
|
||||||
let text = res.text().await?;
|
let text = res.text().await?;
|
||||||
let response: Response<'_> = match serde_json::from_str(&text) {
|
|
||||||
Ok(response) => response,
|
let raw = match serde_json::from_str(&text) {
|
||||||
|
Ok(Response::Success { result, .. }) => result.to_owned(),
|
||||||
|
Ok(Response::Error { error, .. }) => return Err(error.into()),
|
||||||
|
Ok(_) => {
|
||||||
|
let err = ClientError::SerdeJson {
|
||||||
|
err: serde::de::Error::custom("unexpected notification over HTTP transport"),
|
||||||
|
text,
|
||||||
|
};
|
||||||
|
return Err(err)
|
||||||
|
}
|
||||||
Err(err) => return Err(ClientError::SerdeJson { err, text }),
|
Err(err) => return Err(ClientError::SerdeJson { err, text }),
|
||||||
};
|
};
|
||||||
|
|
||||||
let raw = response.as_result().map_err(Clone::clone)?;
|
|
||||||
let res = serde_json::from_str(raw.get())
|
let res = serde_json::from_str(raw.get())
|
||||||
.map_err(|err| ClientError::SerdeJson { err, text: raw.to_string() })?;
|
.map_err(|err| ClientError::SerdeJson { err, text: raw.to_string() })?;
|
||||||
|
|
||||||
|
|
|
@ -1,71 +1,75 @@
|
||||||
use crate::{
|
|
||||||
provider::ProviderError,
|
|
||||||
transports::common::{JsonRpcError, Request, Response},
|
|
||||||
JsonRpcClient, PubsubClient,
|
|
||||||
};
|
|
||||||
use ethers_core::types::U256;
|
|
||||||
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use futures_channel::mpsc;
|
|
||||||
use futures_util::stream::{Fuse, StreamExt};
|
|
||||||
use oneshot::error::RecvError;
|
|
||||||
use serde::{de::DeserializeOwned, Serialize};
|
|
||||||
use serde_json::{value::RawValue, Deserializer, StreamDeserializer};
|
|
||||||
use std::{
|
use std::{
|
||||||
collections::HashMap,
|
cell::RefCell,
|
||||||
|
convert::Infallible,
|
||||||
|
hash::BuildHasherDefault,
|
||||||
path::Path,
|
path::Path,
|
||||||
sync::{
|
sync::{
|
||||||
atomic::{AtomicU64, Ordering},
|
atomic::{AtomicU64, Ordering},
|
||||||
Arc,
|
Arc,
|
||||||
},
|
},
|
||||||
|
thread,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use bytes::{Buf as _, BytesMut};
|
||||||
|
use ethers_core::types::U256;
|
||||||
|
use futures_channel::mpsc;
|
||||||
|
use futures_util::stream::StreamExt as _;
|
||||||
|
use hashers::fx_hash::FxHasher64;
|
||||||
|
use serde::{de::DeserializeOwned, Serialize};
|
||||||
|
use serde_json::{value::RawValue, Deserializer};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
use tokio::{
|
use tokio::{
|
||||||
io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf},
|
io::{AsyncReadExt as _, AsyncWriteExt as _, BufReader},
|
||||||
net::UnixStream,
|
net::{
|
||||||
sync::oneshot,
|
unix::{ReadHalf, WriteHalf},
|
||||||
|
UnixStream,
|
||||||
|
},
|
||||||
|
runtime,
|
||||||
|
sync::oneshot::{self, error::RecvError},
|
||||||
};
|
};
|
||||||
use tokio_util::io::ReaderStream;
|
|
||||||
use tracing::{error, warn};
|
|
||||||
|
|
||||||
use super::common::Notification;
|
use crate::{
|
||||||
|
provider::ProviderError,
|
||||||
|
transports::common::{JsonRpcError, Request, Response},
|
||||||
|
JsonRpcClient, PubsubClient,
|
||||||
|
};
|
||||||
|
|
||||||
|
use super::common::Params;
|
||||||
|
|
||||||
|
type FxHashMap<K, V> = std::collections::HashMap<K, V, BuildHasherDefault<FxHasher64>>;
|
||||||
|
|
||||||
|
type Pending = oneshot::Sender<Result<Box<RawValue>, JsonRpcError>>;
|
||||||
|
type Subscription = mpsc::UnboundedSender<Box<RawValue>>;
|
||||||
|
|
||||||
/// Unix Domain Sockets (IPC) transport.
|
/// Unix Domain Sockets (IPC) transport.
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct Ipc {
|
pub struct Ipc {
|
||||||
id: Arc<AtomicU64>,
|
id: Arc<AtomicU64>,
|
||||||
messages_tx: mpsc::UnboundedSender<TransportMessage>,
|
request_tx: mpsc::UnboundedSender<TransportMessage>,
|
||||||
}
|
}
|
||||||
|
|
||||||
type Pending = oneshot::Sender<Result<Box<RawValue>, JsonRpcError>>;
|
|
||||||
type Subscription = mpsc::UnboundedSender<Box<RawValue>>;
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
enum TransportMessage {
|
enum TransportMessage {
|
||||||
Request { id: u64, request: String, sender: Pending },
|
Request { id: u64, request: Box<[u8]>, sender: Pending },
|
||||||
Subscribe { id: U256, sink: Subscription },
|
Subscribe { id: U256, sink: Subscription },
|
||||||
Unsubscribe { id: U256 },
|
Unsubscribe { id: U256 },
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Ipc {
|
impl Ipc {
|
||||||
/// Creates a new IPC transport from a Async Reader / Writer
|
/// Creates a new IPC transport from a given path using Unix sockets.
|
||||||
fn new<S: AsyncRead + AsyncWrite + Send + 'static>(stream: S) -> Self {
|
pub async fn connect(path: impl AsRef<Path>) -> Result<Self, IpcError> {
|
||||||
let id = Arc::new(AtomicU64::new(1));
|
let id = Arc::new(AtomicU64::new(1));
|
||||||
let (messages_tx, messages_rx) = mpsc::unbounded();
|
let (request_tx, request_rx) = mpsc::unbounded();
|
||||||
|
|
||||||
IpcServer::new(stream, messages_rx).spawn();
|
let stream = UnixStream::connect(path).await?;
|
||||||
Self { id, messages_tx }
|
spawn_ipc_server(stream, request_rx);
|
||||||
}
|
|
||||||
|
|
||||||
/// Creates a new IPC transport from a given path using Unix sockets
|
Ok(Self { id, request_tx })
|
||||||
#[cfg(unix)]
|
|
||||||
pub async fn connect<P: AsRef<Path>>(path: P) -> Result<Self, IpcError> {
|
|
||||||
let ipc = UnixStream::connect(path).await?;
|
|
||||||
Ok(Self::new(ipc))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn send(&self, msg: TransportMessage) -> Result<(), IpcError> {
|
fn send(&self, msg: TransportMessage) -> Result<(), IpcError> {
|
||||||
self.messages_tx
|
self.request_tx
|
||||||
.unbounded_send(msg)
|
.unbounded_send(msg)
|
||||||
.map_err(|_| IpcError::ChannelError("IPC server receiver dropped".to_string()))?;
|
.map_err(|_| IpcError::ChannelError("IPC server receiver dropped".to_string()))?;
|
||||||
|
|
||||||
|
@ -88,7 +92,7 @@ impl JsonRpcClient for Ipc {
|
||||||
let (sender, receiver) = oneshot::channel();
|
let (sender, receiver) = oneshot::channel();
|
||||||
let payload = TransportMessage::Request {
|
let payload = TransportMessage::Request {
|
||||||
id: next_id,
|
id: next_id,
|
||||||
request: serde_json::to_string(&Request::new(next_id, method, params))?,
|
request: serde_json::to_vec(&Request::new(next_id, method, params))?.into_boxed_slice(),
|
||||||
sender,
|
sender,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -117,169 +121,171 @@ impl PubsubClient for Ipc {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct IpcServer<T> {
|
fn spawn_ipc_server(stream: UnixStream, request_rx: mpsc::UnboundedReceiver<TransportMessage>) {
|
||||||
socket_reader: Fuse<ReaderStream<ReadHalf<T>>>,
|
// 65 KiB should be more than enough for this thread, as all unbounded data
|
||||||
socket_writer: WriteHalf<T>,
|
// growth occurs on heap-allocated data structures and buffers and the call
|
||||||
requests: Fuse<mpsc::UnboundedReceiver<TransportMessage>>,
|
// stack is not going to do anything crazy either
|
||||||
pending: HashMap<u64, Pending>,
|
const STACK_SIZE: usize = 1 << 16;
|
||||||
subscriptions: HashMap<U256, Subscription>,
|
// spawn a light-weight thread with a thread-local async runtime just for
|
||||||
|
// sending and receiving data over the IPC socket
|
||||||
|
let _ = thread::Builder::new()
|
||||||
|
.name("ipc-server-thread".to_string())
|
||||||
|
.stack_size(STACK_SIZE)
|
||||||
|
.spawn(move || {
|
||||||
|
let rt = runtime::Builder::new_current_thread()
|
||||||
|
.enable_io()
|
||||||
|
.build()
|
||||||
|
.expect("failed to create ipc-server-thread async runtime");
|
||||||
|
|
||||||
|
rt.block_on(run_ipc_server(stream, request_rx));
|
||||||
|
})
|
||||||
|
.expect("failed to spawn ipc server thread");
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> IpcServer<T>
|
async fn run_ipc_server(
|
||||||
where
|
mut stream: UnixStream,
|
||||||
T: AsyncRead + AsyncWrite,
|
request_rx: mpsc::UnboundedReceiver<TransportMessage>,
|
||||||
{
|
) {
|
||||||
/// Instantiates the Websocket Server
|
// the shared state for both reads & writes
|
||||||
pub fn new(ipc: T, requests: mpsc::UnboundedReceiver<TransportMessage>) -> Self {
|
let shared = Shared {
|
||||||
let (socket_reader, socket_writer) = tokio::io::split(ipc);
|
pending: FxHashMap::with_capacity_and_hasher(64, BuildHasherDefault::default()).into(),
|
||||||
let socket_reader = ReaderStream::new(socket_reader).fuse();
|
subs: FxHashMap::with_capacity_and_hasher(64, BuildHasherDefault::default()).into(),
|
||||||
Self {
|
};
|
||||||
socket_reader,
|
|
||||||
socket_writer,
|
// split the stream and run two independent concurrently (local), thereby
|
||||||
requests: requests.fuse(),
|
// allowing reads and writes to occurr concurrently
|
||||||
pending: HashMap::default(),
|
let (reader, writer) = stream.split();
|
||||||
subscriptions: HashMap::default(),
|
let read = shared.handle_ipc_reads(reader);
|
||||||
|
let write = shared.handle_ipc_writes(writer, request_rx);
|
||||||
|
|
||||||
|
// run both loops concurrently, until either encounts an error
|
||||||
|
if let Err(e) = futures_util::try_join!(read, write) {
|
||||||
|
match e {
|
||||||
|
IpcError::ServerExit => {}
|
||||||
|
err => tracing::error!(?err, "exiting IPC server due to error"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Shared {
|
||||||
|
pending: RefCell<FxHashMap<u64, Pending>>,
|
||||||
|
subs: RefCell<FxHashMap<U256, Subscription>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Shared {
|
||||||
|
async fn handle_ipc_reads(&self, reader: ReadHalf<'_>) -> Result<Infallible, IpcError> {
|
||||||
|
let mut reader = BufReader::new(reader);
|
||||||
|
let mut buf = BytesMut::with_capacity(4096);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
// try to read the next batch of bytes into the buffer
|
||||||
|
let read = reader.read_buf(&mut buf).await?;
|
||||||
|
if read == 0 {
|
||||||
|
// eof, socket was closed
|
||||||
|
return Err(IpcError::ServerExit)
|
||||||
|
}
|
||||||
|
|
||||||
|
// parse the received bytes into 0-n jsonrpc messages
|
||||||
|
let read = self.handle_bytes(&buf)?;
|
||||||
|
// split off all bytes that were parsed into complete messages
|
||||||
|
// any remaining bytes that correspond to incomplete messages remain
|
||||||
|
// in the buffer
|
||||||
|
buf.advance(read);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Spawns the event loop
|
async fn handle_ipc_writes(
|
||||||
fn spawn(mut self)
|
&self,
|
||||||
where
|
mut writer: WriteHalf<'_>,
|
||||||
T: 'static + Send,
|
mut request_rx: mpsc::UnboundedReceiver<TransportMessage>,
|
||||||
{
|
) -> Result<Infallible, IpcError> {
|
||||||
let f = async move {
|
use TransportMessage::*;
|
||||||
let mut read_buffer = Vec::new();
|
|
||||||
loop {
|
while let Some(msg) = request_rx.next().await {
|
||||||
let closed = self.process(&mut read_buffer).await.expect("IPC Server panic");
|
match msg {
|
||||||
if closed && self.pending.is_empty() {
|
Request { id, request, sender } => {
|
||||||
break
|
let prev = self.pending.borrow_mut().insert(id, sender);
|
||||||
|
assert!(prev.is_none(), "replaced pending IPC request (id={})", id);
|
||||||
|
|
||||||
|
if let Err(err) = writer.write_all(&request).await {
|
||||||
|
tracing::error!("IPC connection error: {:?}", err);
|
||||||
|
self.pending.borrow_mut().remove(&id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Subscribe { id, sink } => {
|
||||||
|
if self.subs.borrow_mut().insert(id, sink).is_some() {
|
||||||
|
tracing::warn!(
|
||||||
|
%id,
|
||||||
|
"replaced already-registered subscription"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Unsubscribe { id } => {
|
||||||
|
if self.subs.borrow_mut().remove(&id).is_none() {
|
||||||
|
tracing::warn!(
|
||||||
|
%id,
|
||||||
|
"attempted to unsubscribe from non-existent subscription"
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
|
||||||
|
|
||||||
tokio::spawn(f);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Processes 1 item selected from the incoming `requests` or `socket`
|
|
||||||
#[allow(clippy::single_match)]
|
|
||||||
async fn process(&mut self, read_buffer: &mut Vec<u8>) -> Result<bool, IpcError> {
|
|
||||||
futures_util::select! {
|
|
||||||
// Handle requests
|
|
||||||
msg = self.requests.next() => match msg {
|
|
||||||
Some(msg) => self.handle_request(msg).await?,
|
|
||||||
None => return Ok(true),
|
|
||||||
},
|
|
||||||
// Handle socket messages
|
|
||||||
msg = self.socket_reader.next() => match msg {
|
|
||||||
Some(Ok(msg)) => self.handle_socket(read_buffer, msg)?,
|
|
||||||
Some(Err(err)) => {
|
|
||||||
error!("IPC read error: {:?}", err);
|
|
||||||
return Err(err.into());
|
|
||||||
},
|
|
||||||
None => {},
|
|
||||||
},
|
|
||||||
// finished
|
|
||||||
complete => {},
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(false)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_request(&mut self, msg: TransportMessage) -> Result<(), IpcError> {
|
|
||||||
match msg {
|
|
||||||
TransportMessage::Request { id, request, sender } => {
|
|
||||||
if self.pending.insert(id, sender).is_some() {
|
|
||||||
warn!("Replacing a pending request with id {:?}", id);
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Err(err) = self.socket_writer.write(request.as_bytes()).await {
|
|
||||||
error!("IPC connection error: {:?}", err);
|
|
||||||
self.pending.remove(&id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
TransportMessage::Subscribe { id, sink } => {
|
|
||||||
if self.subscriptions.insert(id, sink).is_some() {
|
|
||||||
warn!("Replacing already-registered subscription with id {:?}", id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
TransportMessage::Unsubscribe { id } => {
|
|
||||||
if self.subscriptions.remove(&id).is_none() {
|
|
||||||
warn!("Unsubscribing from non-existent subscription with id {:?}", id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn handle_socket(
|
|
||||||
&mut self,
|
|
||||||
read_buffer: &mut Vec<u8>,
|
|
||||||
bytes: bytes::Bytes,
|
|
||||||
) -> Result<(), IpcError> {
|
|
||||||
// Extend buffer of previously unread with the new read bytes
|
|
||||||
read_buffer.extend_from_slice(&bytes);
|
|
||||||
// Deserialize as many full elements from the stream as exists
|
|
||||||
let mut de: StreamDeserializer<_, &RawValue> =
|
|
||||||
Deserializer::from_slice(read_buffer).into_iter();
|
|
||||||
// Iterate through these elements, and handle responses/notifications
|
|
||||||
while let Some(Ok(raw)) = de.next() {
|
|
||||||
if let Ok(response) = serde_json::from_str(raw.get()) {
|
|
||||||
// Send notify response if okay.
|
|
||||||
if let Err(e) = self.respond(response) {
|
|
||||||
error!(err = %e, "Failed to send IPC response");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Ok(notification) = serde_json::from_str(raw.get()) {
|
|
||||||
// Send notify response if okay.
|
|
||||||
if let Err(e) = self.notify(notification) {
|
|
||||||
error!(err = %e, "Failed to send IPC notification");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
warn!("JSON from IPC stream is not a response or notification");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the offset of bytes to handle partial buffer reads
|
// the request receiver will only be closed if the sender instance
|
||||||
let read_len = de.byte_offset();
|
// located within the transport handle is dropped, this is not truly an
|
||||||
|
// error but leads to the `try_join` in `run_ipc_server` to cancel the
|
||||||
|
// read half future
|
||||||
|
Err(IpcError::ServerExit)
|
||||||
|
}
|
||||||
|
|
||||||
// Reset buffer to just include the partial value bytes.
|
fn handle_bytes(&self, bytes: &BytesMut) -> Result<usize, IpcError> {
|
||||||
read_buffer.copy_within(read_len.., 0);
|
// deserialize all complete jsonrpc responses in the buffer
|
||||||
read_buffer.truncate(read_buffer.len() - read_len);
|
let mut de = Deserializer::from_slice(bytes.as_ref()).into_iter();
|
||||||
|
while let Some(Ok(response)) = de.next() {
|
||||||
|
match response {
|
||||||
|
Response::Success { id, result } => self.send_response(id, Ok(result.to_owned())),
|
||||||
|
Response::Error { id, error } => self.send_response(id, Err(error)),
|
||||||
|
Response::Notification { params, .. } => self.send_notification(params),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(de.byte_offset())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn send_response(&self, id: u64, result: Result<Box<RawValue>, JsonRpcError>) {
|
||||||
|
// retrieve the channel sender for responding to the pending request
|
||||||
|
let response_tx = match self.pending.borrow_mut().remove(&id) {
|
||||||
|
Some(tx) => tx,
|
||||||
|
None => {
|
||||||
|
tracing::warn!(%id, "no pending request exists for the response ID");
|
||||||
|
return
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// a failure to send the response indicates that the pending request has
|
||||||
|
// been dropped in the mean time
|
||||||
|
let _ = response_tx.send(result.map_err(Into::into));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sends notification through the channel based on the ID of the subscription.
|
/// Sends notification through the channel based on the ID of the subscription.
|
||||||
/// This handles streaming responses.
|
/// This handles streaming responses.
|
||||||
fn notify(&mut self, notification: Notification<'_>) -> Result<(), IpcError> {
|
fn send_notification(&self, params: Params<'_>) {
|
||||||
let id = notification.params.subscription;
|
// retrieve the channel sender for notifying the subscription stream
|
||||||
if let Some(tx) = self.subscriptions.get(&id) {
|
let subs = self.subs.borrow();
|
||||||
tx.unbounded_send(notification.params.result.to_owned()).map_err(|_| {
|
let tx = match subs.get(¶ms.subscription) {
|
||||||
IpcError::ChannelError(format!("Subscription receiver {} dropped", id))
|
Some(tx) => tx,
|
||||||
})?;
|
None => {
|
||||||
}
|
tracing::warn!(
|
||||||
|
id = ?params.subscription,
|
||||||
|
"no subscription exists for the notification ID"
|
||||||
|
);
|
||||||
|
return
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
Ok(())
|
// a failure to send the response indicates that the pending request has
|
||||||
}
|
// been dropped in the mean time (and should have been unsubscribed!)
|
||||||
|
let _ = tx.unbounded_send(params.result.to_owned());
|
||||||
/// Sends JSON response through the channel based on the ID in that response.
|
|
||||||
/// This handles RPC calls with only one response, and the channel entry is dropped after
|
|
||||||
/// sending.
|
|
||||||
fn respond(&mut self, response: Response<'_>) -> Result<(), IpcError> {
|
|
||||||
let id = response.id();
|
|
||||||
let res = response.into_result();
|
|
||||||
|
|
||||||
let response_tx = self.pending.remove(&id).ok_or_else(|| {
|
|
||||||
IpcError::ChannelError("No response channel exists for the response ID".to_string())
|
|
||||||
})?;
|
|
||||||
|
|
||||||
response_tx.send(res).map_err(|_| {
|
|
||||||
IpcError::ChannelError("Receiver channel for response has been dropped".to_string())
|
|
||||||
})?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -302,7 +308,10 @@ pub enum IpcError {
|
||||||
ChannelError(String),
|
ChannelError(String),
|
||||||
|
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
Canceled(#[from] RecvError),
|
RequestCancelled(#[from] RecvError),
|
||||||
|
|
||||||
|
#[error("The IPC server has exited")]
|
||||||
|
ServerExit,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<IpcError> for ProviderError {
|
impl From<IpcError> for ProviderError {
|
||||||
|
|
|
@ -11,10 +11,7 @@ use futures_util::{
|
||||||
sink::{Sink, SinkExt},
|
sink::{Sink, SinkExt},
|
||||||
stream::{Fuse, Stream, StreamExt},
|
stream::{Fuse, Stream, StreamExt},
|
||||||
};
|
};
|
||||||
use serde::{
|
use serde::{de::DeserializeOwned, Serialize};
|
||||||
de::{DeserializeOwned, Error},
|
|
||||||
Serialize,
|
|
||||||
};
|
|
||||||
use serde_json::value::RawValue;
|
use serde_json::value::RawValue;
|
||||||
use std::{
|
use std::{
|
||||||
collections::{btree_map::Entry, BTreeMap},
|
collections::{btree_map::Entry, BTreeMap},
|
||||||
|
@ -26,7 +23,7 @@ use std::{
|
||||||
};
|
};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
|
||||||
use super::common::{Notification, Response};
|
use super::common::{Params, Response};
|
||||||
|
|
||||||
if_wasm! {
|
if_wasm! {
|
||||||
use wasm_bindgen::prelude::*;
|
use wasm_bindgen::prelude::*;
|
||||||
|
@ -320,35 +317,34 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_text(&mut self, inner: String) -> Result<(), ClientError> {
|
async fn handle_text(&mut self, inner: String) -> Result<(), ClientError> {
|
||||||
if let Ok(response) = serde_json::from_str::<Response<'_>>(&inner) {
|
let (id, result) = match serde_json::from_str(&inner)? {
|
||||||
if let Some(request) = self.pending.remove(&response.id()) {
|
Response::Success { id, result } => (id, Ok(result.to_owned())),
|
||||||
if !request.is_canceled() {
|
Response::Error { id, error } => (id, Err(error)),
|
||||||
request.send(response.into_result()).map_err(to_client_error)?;
|
Response::Notification { params, .. } => return self.handle_notification(params),
|
||||||
}
|
};
|
||||||
}
|
|
||||||
|
|
||||||
return Ok(())
|
if let Some(request) = self.pending.remove(&id) {
|
||||||
|
if !request.is_canceled() {
|
||||||
|
request.send(result).map_err(to_client_error)?;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Ok(notification) = serde_json::from_str::<Notification<'_>>(&inner) {
|
Ok(())
|
||||||
let id = notification.params.subscription;
|
}
|
||||||
if let Entry::Occupied(stream) = self.subscriptions.entry(id) {
|
|
||||||
if let Err(err) = stream.get().unbounded_send(notification.params.result.to_owned())
|
|
||||||
{
|
|
||||||
if err.is_disconnected() {
|
|
||||||
// subscription channel was closed on the receiver end
|
|
||||||
stream.remove();
|
|
||||||
}
|
|
||||||
return Err(to_client_error(err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return Ok(())
|
fn handle_notification(&mut self, params: Params<'_>) -> Result<(), ClientError> {
|
||||||
|
let id = params.subscription;
|
||||||
|
if let Entry::Occupied(stream) = self.subscriptions.entry(id) {
|
||||||
|
if let Err(err) = stream.get().unbounded_send(params.result.to_owned()) {
|
||||||
|
if err.is_disconnected() {
|
||||||
|
// subscription channel was closed on the receiver end
|
||||||
|
stream.remove();
|
||||||
|
}
|
||||||
|
return Err(to_client_error(err))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Err(ClientError::JsonError(serde_json::Error::custom(
|
Ok(())
|
||||||
"response is neither a valid jsonrpc response nor notification",
|
|
||||||
)))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(target_arch = "wasm32")]
|
#[cfg(target_arch = "wasm32")]
|
||||||
|
|
Loading…
Reference in New Issue