//! Create a custom data transport to use with a Provider. use async_trait::async_trait; use ethers::{core::utils::Anvil, prelude::*}; use serde::{de::DeserializeOwned, Serialize}; use std::fmt::Debug; use thiserror::Error; use url::Url; /// First we must create an error type, and implement [`From`] for [`ProviderError`]. /// /// Here we are using [`thiserror`](https://docs.rs/thiserror) to wrap [`WsClientError`] /// and [`IpcError`]. /// This also provides a conversion implementation ([`From`]) for both, so we can use /// the [question mark operator](https://doc.rust-lang.org/rust-by-example/std/result/question_mark.html) /// later on in our implementations. #[derive(Debug, Error)] pub enum WsOrIpcError { #[error(transparent)] Ws(#[from] WsClientError), #[error(transparent)] Ipc(#[from] IpcError), } impl From for ProviderError { fn from(value: WsOrIpcError) -> Self { Self::JsonRpcClientError(Box::new(value)) } } /// Next, we create our transport type, which in this case will be an enum that contains /// either [`Ws`] or [`Ipc`]. #[derive(Clone, Debug)] enum WsOrIpc { Ws(Ws), Ipc(Ipc), } // We implement a convenience "constructor" method, to easily initialize the transport. // This will connect to [`Ws`] if it's a valid [URL](url::Url), otherwise it'll // default to [`Ipc`]. impl WsOrIpc { pub async fn connect(s: &str) -> Result { let this = match Url::parse(s) { Ok(url) => Self::Ws(Ws::connect(url).await?), Err(_) => Self::Ipc(Ipc::connect(s).await?), }; Ok(this) } } // Next, the most important step: implement [`JsonRpcClient`]. // // For this implementation, we simply delegate to the wrapped transport and return the // result. // // Note that we are using [`async-trait`](https://docs.rs/async-trait) for asynchronous // functions in traits, as this is not yet supported in stable Rust; see: // #[async_trait] impl JsonRpcClient for WsOrIpc { type Error = WsOrIpcError; async fn request(&self, method: &str, params: T) -> Result where T: Debug + Serialize + Send + Sync, R: DeserializeOwned + Send, { let res = match self { Self::Ws(ws) => JsonRpcClient::request(ws, method, params).await?, Self::Ipc(ipc) => JsonRpcClient::request(ipc, method, params).await?, }; Ok(res) } } // We can also implement [`PubsubClient`], since both `Ws` and `Ipc` implement it, by // doing the same as in the `JsonRpcClient` implementation above. impl PubsubClient for WsOrIpc { // Since both `Ws` and `Ipc`'s `NotificationStream` associated type is the same, // we can simply return one of them. // In case they differed, we would have to create a `WsOrIpcNotificationStream`, // similar to the error type. type NotificationStream = ::NotificationStream; fn subscribe>(&self, id: T) -> Result { let stream = match self { Self::Ws(ws) => PubsubClient::subscribe(ws, id)?, Self::Ipc(ipc) => PubsubClient::subscribe(ipc, id)?, }; Ok(stream) } fn unsubscribe>(&self, id: T) -> Result<(), Self::Error> { match self { Self::Ws(ws) => PubsubClient::unsubscribe(ws, id)?, Self::Ipc(ipc) => PubsubClient::unsubscribe(ipc, id)?, }; Ok(()) } } #[tokio::main] async fn main() -> eyre::Result<()> { // Spawn Anvil let anvil = Anvil::new().block_time(1u64).spawn(); // Connect to our transport let transport = WsOrIpc::connect(&anvil.ws_endpoint()).await?; // Wrap the transport in a provider let provider = Provider::new(transport); // Now we can use our custom transport provider like normal let block_number = provider.get_block_number().await?; println!("Current block: {block_number}"); let mut subscription = provider.subscribe_blocks().await?.take(3); while let Some(block) = subscription.next().await { println!("New block: {:?}", block.number); } Ok(()) }