2020-11-30 09:33:06 +00:00
|
|
|
use crate::{
|
|
|
|
provider::ProviderError,
|
|
|
|
transports::common::{JsonRpcError, Notification, Request, Response},
|
|
|
|
JsonRpcClient, PubsubClient,
|
|
|
|
};
|
|
|
|
use ethers_core::types::U256;
|
2020-06-21 07:17:11 +00:00
|
|
|
|
|
|
|
use async_trait::async_trait;
|
2020-11-30 09:33:06 +00:00
|
|
|
use futures_channel::{mpsc, oneshot};
|
2020-06-21 07:17:11 +00:00
|
|
|
use futures_util::{
|
|
|
|
sink::{Sink, SinkExt},
|
2020-11-30 09:33:06 +00:00
|
|
|
stream::{Fuse, Stream, StreamExt},
|
|
|
|
};
|
|
|
|
use serde::{de::DeserializeOwned, Serialize};
|
|
|
|
use std::{
|
2021-10-29 12:29:35 +00:00
|
|
|
collections::{btree_map::Entry, BTreeMap},
|
2020-11-30 09:33:06 +00:00
|
|
|
fmt::{self, Debug},
|
|
|
|
sync::{
|
|
|
|
atomic::{AtomicU64, Ordering},
|
|
|
|
Arc,
|
|
|
|
},
|
2020-06-21 07:17:11 +00:00
|
|
|
};
|
|
|
|
use thiserror::Error;
|
2021-08-23 09:56:44 +00:00
|
|
|
|
|
|
|
if_wasm! {
|
|
|
|
use wasm_bindgen::prelude::*;
|
|
|
|
use wasm_bindgen_futures::spawn_local;
|
|
|
|
use ws_stream_wasm::*;
|
|
|
|
|
|
|
|
type Message = WsMessage;
|
|
|
|
type WsError = ws_stream_wasm::WsErr;
|
|
|
|
type WsStreamItem = Message;
|
|
|
|
|
|
|
|
macro_rules! error {
|
|
|
|
( $( $t:tt )* ) => {
|
|
|
|
web_sys::console::error_1(&format!( $( $t )* ).into());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
macro_rules! warn {
|
|
|
|
( $( $t:tt )* ) => {
|
|
|
|
web_sys::console::warn_1(&format!( $( $t )* ).into());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
macro_rules! debug {
|
|
|
|
( $( $t:tt )* ) => {
|
|
|
|
web_sys::console::log_1(&format!( $( $t )* ).into());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if_not_wasm! {
|
|
|
|
use tokio_tungstenite::{
|
|
|
|
connect_async,
|
|
|
|
tungstenite::{
|
|
|
|
self,
|
|
|
|
protocol::CloseFrame,
|
|
|
|
},
|
|
|
|
};
|
|
|
|
type Message = tungstenite::protocol::Message;
|
|
|
|
type WsError = tungstenite::Error;
|
|
|
|
type WsStreamItem = Result<Message, WsError>;
|
2022-01-27 10:04:53 +00:00
|
|
|
use super::Authorization;
|
2021-08-23 09:56:44 +00:00
|
|
|
use tracing::{debug, error, warn};
|
2022-01-27 10:04:53 +00:00
|
|
|
use http::Request as HttpRequest;
|
2022-03-10 21:36:42 +00:00
|
|
|
use tungstenite::client::IntoClientRequest;
|
2021-08-23 09:56:44 +00:00
|
|
|
}
|
2020-06-21 07:17:11 +00:00
|
|
|
|
2021-08-20 17:23:39 +00:00
|
|
|
type Pending = oneshot::Sender<Result<serde_json::Value, JsonRpcError>>;
|
|
|
|
type Subscription = mpsc::UnboundedSender<serde_json::Value>;
|
|
|
|
|
|
|
|
/// Instructions for the `WsServer`.
|
|
|
|
enum Instruction {
|
|
|
|
/// JSON-RPC request
|
2021-10-29 12:29:35 +00:00
|
|
|
Request { id: u64, request: String, sender: Pending },
|
2021-08-20 17:23:39 +00:00
|
|
|
/// Create a new subscription
|
|
|
|
Subscribe { id: U256, sink: Subscription },
|
|
|
|
/// Cancel an existing subscription
|
|
|
|
Unsubscribe { id: U256 },
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug, serde::Deserialize)]
|
|
|
|
#[serde(untagged)]
|
|
|
|
enum Incoming {
|
|
|
|
Notification(Notification<serde_json::Value>),
|
|
|
|
Response(Response<serde_json::Value>),
|
|
|
|
}
|
|
|
|
|
2020-06-21 07:17:11 +00:00
|
|
|
/// A JSON-RPC Client over Websockets.
|
|
|
|
///
|
|
|
|
/// ```no_run
|
|
|
|
/// # async fn foo() -> Result<(), Box<dyn std::error::Error>> {
|
2021-08-28 21:06:29 +00:00
|
|
|
/// use ethers_providers::Ws;
|
2020-06-21 07:17:11 +00:00
|
|
|
///
|
|
|
|
/// let ws = Ws::connect("wss://localhost:8545").await?;
|
|
|
|
/// # Ok(())
|
|
|
|
/// # }
|
|
|
|
/// ```
|
2020-11-30 09:33:06 +00:00
|
|
|
#[derive(Clone)]
|
|
|
|
pub struct Ws {
|
|
|
|
id: Arc<AtomicU64>,
|
2021-08-20 17:23:39 +00:00
|
|
|
instructions: mpsc::UnboundedSender<Instruction>,
|
2020-06-21 07:17:11 +00:00
|
|
|
}
|
|
|
|
|
2020-11-30 09:33:06 +00:00
|
|
|
impl Debug for Ws {
|
2020-09-24 21:33:09 +00:00
|
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
2021-10-29 12:29:35 +00:00
|
|
|
f.debug_struct("WebsocketProvider").field("id", &self.id).finish()
|
2020-09-24 21:33:09 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-11-30 09:33:06 +00:00
|
|
|
impl Ws {
|
|
|
|
/// Initializes a new WebSocket Client, given a Stream/Sink Websocket implementer.
|
|
|
|
/// The websocket connection must be initiated separately.
|
|
|
|
pub fn new<S: 'static>(ws: S) -> Self
|
|
|
|
where
|
2021-08-23 09:56:44 +00:00
|
|
|
S: Send + Sync + Stream<Item = WsStreamItem> + Sink<Message, Error = WsError> + Unpin,
|
2020-11-30 09:33:06 +00:00
|
|
|
{
|
|
|
|
let (sink, stream) = mpsc::unbounded();
|
|
|
|
// Spawn the server
|
|
|
|
WsServer::new(ws, stream).spawn();
|
|
|
|
|
2021-10-29 12:29:35 +00:00
|
|
|
Self { id: Arc::new(AtomicU64::new(0)), instructions: sink }
|
2020-11-30 09:33:06 +00:00
|
|
|
}
|
|
|
|
|
2021-08-20 16:58:13 +00:00
|
|
|
/// Returns true if the WS connection is active, false otherwise
|
|
|
|
pub fn ready(&self) -> bool {
|
2021-08-20 17:23:39 +00:00
|
|
|
!self.instructions.is_closed()
|
2021-08-20 16:58:13 +00:00
|
|
|
}
|
|
|
|
|
2020-12-31 17:19:14 +00:00
|
|
|
/// Initializes a new WebSocket Client
|
2021-08-23 09:56:44 +00:00
|
|
|
#[cfg(target_arch = "wasm32")]
|
|
|
|
pub async fn connect(url: &str) -> Result<Self, ClientError> {
|
2021-10-29 12:29:35 +00:00
|
|
|
let (_, wsio) = WsMeta::connect(url, None).await.expect_throw("Could not create websocket");
|
2021-08-23 09:56:44 +00:00
|
|
|
|
|
|
|
Ok(Self::new(wsio))
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Initializes a new WebSocket Client
|
|
|
|
#[cfg(not(target_arch = "wasm32"))]
|
2022-03-10 21:36:42 +00:00
|
|
|
pub async fn connect(url: impl IntoClientRequest + Unpin) -> Result<Self, ClientError> {
|
2020-06-21 07:17:11 +00:00
|
|
|
let (ws, _) = connect_async(url).await?;
|
|
|
|
Ok(Self::new(ws))
|
|
|
|
}
|
2020-11-30 09:33:06 +00:00
|
|
|
|
2022-01-27 10:04:53 +00:00
|
|
|
/// Initializes a new WebSocket Client with authentication
|
|
|
|
#[cfg(not(target_arch = "wasm32"))]
|
|
|
|
pub async fn connect_with_auth(
|
2022-03-10 21:36:42 +00:00
|
|
|
uri: impl IntoClientRequest + Unpin,
|
2022-01-27 10:04:53 +00:00
|
|
|
auth: Authorization,
|
|
|
|
) -> Result<Self, ClientError> {
|
2022-03-10 21:36:42 +00:00
|
|
|
let mut request: HttpRequest<()> = uri.into_client_request()?;
|
2022-01-27 10:04:53 +00:00
|
|
|
|
|
|
|
let mut auth_value = http::HeaderValue::from_str(&auth.to_string())?;
|
|
|
|
auth_value.set_sensitive(true);
|
|
|
|
|
|
|
|
request.headers_mut().insert(http::header::AUTHORIZATION, auth_value);
|
|
|
|
Self::connect(request).await
|
|
|
|
}
|
|
|
|
|
2021-08-20 17:23:39 +00:00
|
|
|
fn send(&self, msg: Instruction) -> Result<(), ClientError> {
|
2021-10-29 12:29:35 +00:00
|
|
|
self.instructions.unbounded_send(msg).map_err(to_client_error)
|
2020-11-30 09:33:06 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-08-23 09:56:44 +00:00
|
|
|
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
|
|
|
|
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
|
2020-11-30 09:33:06 +00:00
|
|
|
impl JsonRpcClient for Ws {
|
|
|
|
type Error = ClientError;
|
|
|
|
|
|
|
|
async fn request<T: Serialize + Send + Sync, R: DeserializeOwned>(
|
|
|
|
&self,
|
|
|
|
method: &str,
|
|
|
|
params: T,
|
|
|
|
) -> Result<R, ClientError> {
|
2021-03-02 22:19:20 +00:00
|
|
|
let next_id = self.id.fetch_add(1, Ordering::SeqCst);
|
2020-11-30 09:33:06 +00:00
|
|
|
|
|
|
|
// send the message
|
|
|
|
let (sender, receiver) = oneshot::channel();
|
2021-08-20 17:23:39 +00:00
|
|
|
let payload = Instruction::Request {
|
2020-11-30 09:33:06 +00:00
|
|
|
id: next_id,
|
|
|
|
request: serde_json::to_string(&Request::new(next_id, method, params))?,
|
|
|
|
sender,
|
|
|
|
};
|
|
|
|
|
|
|
|
// send the data
|
2021-08-20 17:23:39 +00:00
|
|
|
self.send(payload)?;
|
2020-11-30 09:33:06 +00:00
|
|
|
|
|
|
|
// wait for the response
|
|
|
|
let res = receiver.await?;
|
|
|
|
|
2021-06-12 07:46:19 +00:00
|
|
|
// in case the request itself has any errors
|
|
|
|
let res = res?;
|
|
|
|
|
2020-11-30 09:33:06 +00:00
|
|
|
// parse it
|
|
|
|
Ok(serde_json::from_value(res)?)
|
|
|
|
}
|
2020-06-21 07:17:11 +00:00
|
|
|
}
|
|
|
|
|
2020-11-30 09:33:06 +00:00
|
|
|
impl PubsubClient for Ws {
|
|
|
|
type NotificationStream = mpsc::UnboundedReceiver<serde_json::Value>;
|
|
|
|
|
|
|
|
fn subscribe<T: Into<U256>>(&self, id: T) -> Result<Self::NotificationStream, ClientError> {
|
|
|
|
let (sink, stream) = mpsc::unbounded();
|
2021-10-29 12:29:35 +00:00
|
|
|
self.send(Instruction::Subscribe { id: id.into(), sink })?;
|
2020-11-30 09:33:06 +00:00
|
|
|
Ok(stream)
|
|
|
|
}
|
|
|
|
|
|
|
|
fn unsubscribe<T: Into<U256>>(&self, id: T) -> Result<(), ClientError> {
|
2021-08-20 17:23:39 +00:00
|
|
|
self.send(Instruction::Unsubscribe { id: id.into() })
|
2020-11-30 09:33:06 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
struct WsServer<S> {
|
|
|
|
ws: Fuse<S>,
|
2021-08-20 17:23:39 +00:00
|
|
|
instructions: Fuse<mpsc::UnboundedReceiver<Instruction>>,
|
2020-11-30 09:33:06 +00:00
|
|
|
|
|
|
|
pending: BTreeMap<u64, Pending>,
|
|
|
|
subscriptions: BTreeMap<U256, Subscription>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<S> WsServer<S>
|
2020-06-21 07:17:11 +00:00
|
|
|
where
|
2021-08-23 09:56:44 +00:00
|
|
|
S: Send + Sync + Stream<Item = WsStreamItem> + Sink<Message, Error = WsError> + Unpin,
|
2020-06-21 07:17:11 +00:00
|
|
|
{
|
2020-11-30 09:33:06 +00:00
|
|
|
/// Instantiates the Websocket Server
|
2021-08-20 17:23:39 +00:00
|
|
|
fn new(ws: S, requests: mpsc::UnboundedReceiver<Instruction>) -> Self {
|
2020-06-21 07:17:11 +00:00
|
|
|
Self {
|
2020-11-30 09:33:06 +00:00
|
|
|
// Fuse the 2 steams together, so that we can `select` them in the
|
|
|
|
// Stream implementation
|
|
|
|
ws: ws.fuse(),
|
2021-08-20 17:23:39 +00:00
|
|
|
instructions: requests.fuse(),
|
2020-11-30 09:33:06 +00:00
|
|
|
pending: BTreeMap::default(),
|
|
|
|
subscriptions: BTreeMap::default(),
|
2020-06-21 07:17:11 +00:00
|
|
|
}
|
|
|
|
}
|
2020-11-30 09:33:06 +00:00
|
|
|
|
2021-08-22 15:47:44 +00:00
|
|
|
/// Returns whether the all work has been completed.
|
|
|
|
///
|
|
|
|
/// If this method returns `true`, then the `instructions` channel has been closed and all
|
|
|
|
/// pending requests and subscriptions have been completed.
|
|
|
|
fn is_done(&self) -> bool {
|
|
|
|
self.instructions.is_done() && self.pending.is_empty() && self.subscriptions.is_empty()
|
|
|
|
}
|
|
|
|
|
2020-11-30 09:33:06 +00:00
|
|
|
/// Spawns the event loop
|
|
|
|
fn spawn(mut self)
|
|
|
|
where
|
|
|
|
S: 'static,
|
|
|
|
{
|
|
|
|
let f = async move {
|
|
|
|
loop {
|
2021-08-22 15:47:44 +00:00
|
|
|
if self.is_done() {
|
2021-08-23 09:56:44 +00:00
|
|
|
debug!("work complete");
|
2021-10-29 12:29:35 +00:00
|
|
|
break
|
2021-08-22 15:47:44 +00:00
|
|
|
}
|
2021-08-20 17:23:39 +00:00
|
|
|
match self.tick().await {
|
2021-08-20 16:58:13 +00:00
|
|
|
Err(ClientError::UnexpectedClose) => {
|
2021-08-23 09:56:44 +00:00
|
|
|
error!("{}", ClientError::UnexpectedClose);
|
2021-10-29 12:29:35 +00:00
|
|
|
break
|
2021-08-20 16:58:13 +00:00
|
|
|
}
|
2021-08-20 17:23:39 +00:00
|
|
|
Err(e) => {
|
|
|
|
panic!("WS Server panic: {}", e);
|
2021-08-20 16:58:13 +00:00
|
|
|
}
|
|
|
|
_ => {}
|
|
|
|
}
|
2020-11-30 09:33:06 +00:00
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2021-08-23 09:56:44 +00:00
|
|
|
#[cfg(target_arch = "wasm32")]
|
|
|
|
spawn_local(f);
|
|
|
|
|
|
|
|
#[cfg(not(target_arch = "wasm32"))]
|
2020-11-30 09:33:06 +00:00
|
|
|
tokio::spawn(f);
|
|
|
|
}
|
|
|
|
|
2021-08-20 17:23:39 +00:00
|
|
|
// dispatch an RPC request
|
|
|
|
async fn service_request(
|
|
|
|
&mut self,
|
|
|
|
id: u64,
|
|
|
|
request: String,
|
|
|
|
sender: Pending,
|
|
|
|
) -> Result<(), ClientError> {
|
|
|
|
if self.pending.insert(id, sender).is_some() {
|
|
|
|
warn!("Replacing a pending request with id {:?}", id);
|
|
|
|
}
|
2020-11-30 09:33:06 +00:00
|
|
|
|
2021-08-20 17:23:39 +00:00
|
|
|
if let Err(e) = self.ws.send(Message::Text(request)).await {
|
|
|
|
error!("WS connection error: {:?}", e);
|
|
|
|
self.pending.remove(&id);
|
|
|
|
}
|
2020-11-30 09:33:06 +00:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2021-08-20 17:23:39 +00:00
|
|
|
/// Dispatch a subscription request
|
|
|
|
async fn service_subscribe(&mut self, id: U256, sink: Subscription) -> Result<(), ClientError> {
|
|
|
|
if self.subscriptions.insert(id, sink).is_some() {
|
|
|
|
warn!("Replacing already-registered subscription with id {:?}", id);
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Dispatch a unsubscribe request
|
|
|
|
async fn service_unsubscribe(&mut self, id: U256) -> Result<(), ClientError> {
|
|
|
|
if self.subscriptions.remove(&id).is_none() {
|
2021-10-29 12:29:35 +00:00
|
|
|
warn!("Unsubscribing from non-existent subscription with id {:?}", id);
|
2021-08-20 17:23:39 +00:00
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Dispatch an outgoing message
|
|
|
|
async fn service(&mut self, instruction: Instruction) -> Result<(), ClientError> {
|
|
|
|
match instruction {
|
2021-10-29 12:29:35 +00:00
|
|
|
Instruction::Request { id, request, sender } => {
|
|
|
|
self.service_request(id, request, sender).await
|
|
|
|
}
|
2021-08-20 17:23:39 +00:00
|
|
|
Instruction::Subscribe { id, sink } => self.service_subscribe(id, sink).await,
|
|
|
|
Instruction::Unsubscribe { id } => self.service_unsubscribe(id).await,
|
|
|
|
}
|
|
|
|
}
|
2020-11-30 09:33:06 +00:00
|
|
|
|
2021-08-23 09:56:44 +00:00
|
|
|
#[cfg(not(target_arch = "wasm32"))]
|
2021-08-20 17:23:39 +00:00
|
|
|
async fn handle_ping(&mut self, inner: Vec<u8>) -> Result<(), ClientError> {
|
|
|
|
self.ws.send(Message::Pong(inner)).await?;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn handle_text(&mut self, inner: String) -> Result<(), ClientError> {
|
|
|
|
match serde_json::from_str::<Incoming>(&inner) {
|
2022-01-24 13:38:00 +00:00
|
|
|
Err(err) => return Err(ClientError::JsonError(err)),
|
|
|
|
|
2021-08-20 17:23:39 +00:00
|
|
|
Ok(Incoming::Response(resp)) => {
|
|
|
|
if let Some(request) = self.pending.remove(&resp.id) {
|
2021-12-03 18:14:13 +00:00
|
|
|
if !request.is_canceled() {
|
|
|
|
request.send(resp.data.into_result()).map_err(to_client_error)?;
|
|
|
|
}
|
2020-11-30 09:33:06 +00:00
|
|
|
}
|
|
|
|
}
|
2021-08-20 17:23:39 +00:00
|
|
|
Ok(Incoming::Notification(notification)) => {
|
|
|
|
let id = notification.params.subscription;
|
2021-08-22 15:47:44 +00:00
|
|
|
if let Entry::Occupied(stream) = self.subscriptions.entry(id) {
|
|
|
|
if let Err(err) = stream.get().unbounded_send(notification.params.result) {
|
|
|
|
if err.is_disconnected() {
|
|
|
|
// subscription channel was closed on the receiver end
|
|
|
|
stream.remove();
|
|
|
|
}
|
2021-10-29 12:29:35 +00:00
|
|
|
return Err(to_client_error(err))
|
2021-08-22 15:47:44 +00:00
|
|
|
}
|
2020-11-30 09:33:06 +00:00
|
|
|
}
|
|
|
|
}
|
2021-08-20 17:23:39 +00:00
|
|
|
}
|
2020-11-30 09:33:06 +00:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2021-08-23 09:56:44 +00:00
|
|
|
#[cfg(target_arch = "wasm32")]
|
|
|
|
async fn handle(&mut self, resp: Message) -> Result<(), ClientError> {
|
|
|
|
match resp {
|
|
|
|
Message::Text(inner) => self.handle_text(inner).await,
|
|
|
|
Message::Binary(buf) => Err(ClientError::UnexpectedBinary(buf)),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[cfg(not(target_arch = "wasm32"))]
|
2021-08-20 17:23:39 +00:00
|
|
|
async fn handle(&mut self, resp: Message) -> Result<(), ClientError> {
|
2021-01-15 09:33:38 +00:00
|
|
|
match resp {
|
|
|
|
Message::Text(inner) => self.handle_text(inner).await,
|
2022-02-28 08:40:42 +00:00
|
|
|
Message::Frame(_) => Ok(()), // Server is allowed to send Raw frames
|
2021-01-15 09:33:38 +00:00
|
|
|
Message::Ping(inner) => self.handle_ping(inner).await,
|
|
|
|
Message::Pong(_) => Ok(()), // Server is allowed to send unsolicited pongs.
|
2021-08-20 16:58:13 +00:00
|
|
|
Message::Close(Some(frame)) => Err(ClientError::WsClosed(frame)),
|
|
|
|
Message::Close(None) => Err(ClientError::UnexpectedClose),
|
|
|
|
Message::Binary(buf) => Err(ClientError::UnexpectedBinary(buf)),
|
2021-01-15 09:33:38 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-08-20 17:23:39 +00:00
|
|
|
/// Processes 1 instruction or 1 incoming websocket message
|
|
|
|
#[allow(clippy::single_match)]
|
2021-08-23 09:56:44 +00:00
|
|
|
#[cfg(target_arch = "wasm32")]
|
|
|
|
async fn tick(&mut self) -> Result<(), ClientError> {
|
|
|
|
futures_util::select! {
|
|
|
|
// Handle requests
|
|
|
|
instruction = self.instructions.select_next_some() => {
|
|
|
|
self.service(instruction).await?;
|
|
|
|
},
|
|
|
|
// Handle ws messages
|
|
|
|
resp = self.ws.next() => match resp {
|
|
|
|
Some(resp) => self.handle(resp).await?,
|
|
|
|
None => {
|
|
|
|
return Err(ClientError::UnexpectedClose);
|
|
|
|
},
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Processes 1 instruction or 1 incoming websocket message
|
|
|
|
#[allow(clippy::single_match)]
|
|
|
|
#[cfg(not(target_arch = "wasm32"))]
|
2021-08-20 17:23:39 +00:00
|
|
|
async fn tick(&mut self) -> Result<(), ClientError> {
|
|
|
|
futures_util::select! {
|
|
|
|
// Handle requests
|
|
|
|
instruction = self.instructions.select_next_some() => {
|
|
|
|
self.service(instruction).await?;
|
|
|
|
},
|
|
|
|
// Handle ws messages
|
|
|
|
resp = self.ws.next() => match resp {
|
|
|
|
Some(Ok(resp)) => self.handle(resp).await?,
|
|
|
|
// TODO: Log the error?
|
|
|
|
Some(Err(_)) => {},
|
|
|
|
None => {
|
|
|
|
return Err(ClientError::UnexpectedClose);
|
|
|
|
},
|
2020-11-30 09:33:06 +00:00
|
|
|
}
|
2021-08-20 17:23:39 +00:00
|
|
|
};
|
|
|
|
|
2020-11-30 09:33:06 +00:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// TrySendError is private :(
|
2021-06-12 07:46:19 +00:00
|
|
|
fn to_client_error<T: Debug>(err: T) -> ClientError {
|
|
|
|
ClientError::ChannelError(format!("{:?}", err))
|
2020-06-21 07:17:11 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Error, Debug)]
|
|
|
|
/// Error thrown when sending a WS message
|
|
|
|
pub enum ClientError {
|
|
|
|
/// Thrown if deserialization failed
|
|
|
|
#[error(transparent)]
|
|
|
|
JsonError(#[from] serde_json::Error),
|
|
|
|
|
|
|
|
#[error(transparent)]
|
|
|
|
/// Thrown if the response could not be parsed
|
|
|
|
JsonRpcError(#[from] JsonRpcError),
|
|
|
|
|
2021-08-20 16:58:13 +00:00
|
|
|
/// Thrown if the websocket responds with binary data
|
|
|
|
#[error("Websocket responded with unexpected binary data")]
|
|
|
|
UnexpectedBinary(Vec<u8>),
|
2020-06-21 07:17:11 +00:00
|
|
|
|
|
|
|
/// Thrown if there's an error over the WS connection
|
|
|
|
#[error(transparent)]
|
2021-08-23 09:56:44 +00:00
|
|
|
TungsteniteError(#[from] WsError),
|
2020-11-30 09:33:06 +00:00
|
|
|
|
|
|
|
#[error("{0}")]
|
|
|
|
ChannelError(String),
|
|
|
|
|
|
|
|
#[error(transparent)]
|
|
|
|
Canceled(#[from] oneshot::Canceled),
|
2021-08-20 16:58:13 +00:00
|
|
|
|
|
|
|
/// Remote server sent a Close message
|
|
|
|
#[error("Websocket closed with info: {0:?}")]
|
2021-08-23 09:56:44 +00:00
|
|
|
#[cfg(not(target_arch = "wasm32"))]
|
2021-08-20 16:58:13 +00:00
|
|
|
WsClosed(CloseFrame<'static>),
|
|
|
|
|
2021-08-23 09:56:44 +00:00
|
|
|
/// Remote server sent a Close message
|
|
|
|
#[error("Websocket closed with info")]
|
|
|
|
#[cfg(target_arch = "wasm32")]
|
|
|
|
WsClosed,
|
|
|
|
|
2021-08-20 16:58:13 +00:00
|
|
|
/// Something caused the websocket to close
|
|
|
|
#[error("WebSocket connection closed unexpectedly")]
|
|
|
|
UnexpectedClose,
|
2022-01-27 10:04:53 +00:00
|
|
|
|
|
|
|
/// Could not create an auth header for websocket handshake
|
|
|
|
#[error(transparent)]
|
|
|
|
#[cfg(not(target_arch = "wasm32"))]
|
|
|
|
WsAuth(#[from] http::header::InvalidHeaderValue),
|
|
|
|
|
|
|
|
/// Unable to create a valid Uri
|
|
|
|
#[error(transparent)]
|
|
|
|
#[cfg(not(target_arch = "wasm32"))]
|
|
|
|
UriError(#[from] http::uri::InvalidUri),
|
|
|
|
|
|
|
|
/// Unable to create a valid Request
|
|
|
|
#[error(transparent)]
|
|
|
|
#[cfg(not(target_arch = "wasm32"))]
|
|
|
|
RequestError(#[from] http::Error),
|
2020-06-21 07:17:11 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
impl From<ClientError> for ProviderError {
|
|
|
|
fn from(src: ClientError) -> Self {
|
|
|
|
ProviderError::JsonRpcClientError(Box::new(src))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-11-30 09:33:06 +00:00
|
|
|
#[cfg(test)]
|
|
|
|
#[cfg(not(feature = "celo"))]
|
2021-08-23 09:56:44 +00:00
|
|
|
#[cfg(not(target_arch = "wasm32"))]
|
2020-11-30 09:33:06 +00:00
|
|
|
mod tests {
|
|
|
|
use super::*;
|
2021-10-29 12:29:35 +00:00
|
|
|
use ethers_core::{
|
|
|
|
types::{Block, TxHash, U256},
|
|
|
|
utils::Ganache,
|
|
|
|
};
|
2020-06-21 07:17:11 +00:00
|
|
|
|
2020-11-30 09:33:06 +00:00
|
|
|
#[tokio::test]
|
|
|
|
async fn request() {
|
|
|
|
let ganache = Ganache::new().block_time(1u64).spawn();
|
|
|
|
let ws = Ws::connect(ganache.ws_endpoint()).await.unwrap();
|
2020-06-21 07:17:11 +00:00
|
|
|
|
2020-11-30 09:33:06 +00:00
|
|
|
let block_num: U256 = ws.request("eth_blockNumber", ()).await.unwrap();
|
|
|
|
std::thread::sleep(std::time::Duration::new(3, 0));
|
|
|
|
let block_num2: U256 = ws.request("eth_blockNumber", ()).await.unwrap();
|
|
|
|
assert!(block_num2 > block_num);
|
|
|
|
}
|
2020-06-21 07:17:11 +00:00
|
|
|
|
2020-11-30 09:33:06 +00:00
|
|
|
#[tokio::test]
|
|
|
|
async fn subscription() {
|
|
|
|
let ganache = Ganache::new().block_time(1u64).spawn();
|
|
|
|
let ws = Ws::connect(ganache.ws_endpoint()).await.unwrap();
|
2020-06-21 07:17:11 +00:00
|
|
|
|
2020-11-30 09:33:06 +00:00
|
|
|
// Subscribing requires sending the sub request and then subscribing to
|
|
|
|
// the returned sub_id
|
|
|
|
let sub_id: U256 = ws.request("eth_subscribe", ["newHeads"]).await.unwrap();
|
|
|
|
let mut stream = ws.subscribe(sub_id).unwrap();
|
2020-06-21 07:17:11 +00:00
|
|
|
|
2020-11-30 09:33:06 +00:00
|
|
|
let mut blocks = Vec::new();
|
|
|
|
for _ in 0..3 {
|
|
|
|
let item = stream.next().await.unwrap();
|
|
|
|
let block = serde_json::from_value::<Block<TxHash>>(item).unwrap();
|
|
|
|
blocks.push(block.number.unwrap_or_default().as_u64());
|
|
|
|
}
|
2020-06-21 07:17:11 +00:00
|
|
|
|
2020-11-30 09:33:06 +00:00
|
|
|
assert_eq!(sub_id, 1.into());
|
|
|
|
assert_eq!(blocks, vec![1, 2, 3])
|
2020-06-21 07:17:11 +00:00
|
|
|
}
|
2022-01-24 13:38:00 +00:00
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
async fn deserialization_fails() {
|
|
|
|
let ganache = Ganache::new().block_time(1u64).spawn();
|
|
|
|
let (ws, _) = tokio_tungstenite::connect_async(ganache.ws_endpoint()).await.unwrap();
|
|
|
|
let malformed_data = String::from("not a valid message");
|
|
|
|
let (_, stream) = mpsc::unbounded();
|
|
|
|
let resp = WsServer::new(ws, stream).handle_text(malformed_data).await;
|
|
|
|
assert!(resp.is_err(), "Deserialization should not fail silently");
|
|
|
|
}
|
2020-06-21 07:17:11 +00:00
|
|
|
}
|