ethers-rs/book/providers/ws.md

3.2 KiB

WebSocket provider

The Ws provider allows you to send JSON-RPC requests and receive responses over WebSocket connections. The WS provider can be used with any Ethereum node that supports WebSocket connections. This allows programs interact with the network in real-time without the need for HTTP polling for things like new block headers and filter logs. Ethers-rs has support for WebSockets via Tokio. Make sure that you have the “ws” and “rustls” / “openssl” features enabled in your project's toml file if you wish to use WebSockets.

Initializing a WS Provider

Lets look at a few ways to create a new WS provider. Below is the most straightforward way to initialize a new Ws provider.

#[tokio::main]
async fn main() -> eyre::Result<()> {
    let ws_endpoint = "";
    let provider = Provider::<Ws>::connect(ws_endpoint).await?;
    Ok(())
}

Similar to the other providers, you can also establish an authorized connection with a node via websockets.

#[tokio::main]
async fn main() -> eyre::Result<()> {
    let ws_endpoint = "";
    let auth = Authorization::basic("username", "password");
    
    if let Ok(_provider) = Provider::<Ws>::connect_with_auth(url, auth).await {
        println!("Create Ws provider with auth");
    }
    
    Ok(())
}

Usage

The Ws provider allows a user to send requests to the node just like the other providers. In addition to these methods, the Ws provider can also subscribe to new logs and events, watch transactions in the mempool and other types of data streams from the node. The default polling interval for the Ws provider is 7 seconds. You can update the polling interval, by using the provider.interval() method.

In the snippet below, a new Ws provider is used to watch pending transactions in the mempool as well as new block headers in two separate threads.

use ethers::providers::{Middleware, Provider, StreamExt, Ws};
use std::{sync::Arc, time::Duration};
#[tokio::main]
async fn main() -> eyre::Result<()> {
    let ws_endpoint = "";
    let mut provider = Provider::<Ws>::connect(ws_endpoint).await?;

    // Update the polling interval
    provider.set_interval(Duration::new(3, 0));

    // Clone the providers to use in separate threads
    let provider = Arc::new(provider);
    let provider_0 = provider.clone();
    let provider_1 = provider.clone();

    let mut handles = vec![];

    let pending_tx_handle = tokio::spawn(async move {
        let mut tx_pool_stream = provider_0.watch_pending_transactions().await?;
        while let Some(tx_hash) = tx_pool_stream.next().await {
            println!("Pending tx: {:?}", tx_hash);
        }
    });

    let new_block_headers_handle = tokio::spawn(async move {
        let mut new_block_headers_stream = provider_1.watch_blocks().await?;
        while let Some(block_hash) = new_block_headers_stream.next().await {
            println!("New block: {:?}", block_hash);
        }
    });

    // Add the JoinHandles to a vec and wait for the handles to complete
    handles.push(pending_tx_handle);
    handles.push(new_block_headers_handle);
    for handle in handles {
        if let Err(err) = handle.await {
            panic!("{}", err);
        }
    }

    Ok(())
}