From b15d0f86438b31cb97f3acedecb9c6530d3d974c Mon Sep 17 00:00:00 2001 From: Meet Mangukiya Date: Mon, 16 May 2022 20:41:25 +0530 Subject: [PATCH] feat(providers): load previous logs before subscribing (#1264) * feat(providers): load previous logs before subscribing Load previous logs and stream it back to the user before establishing a new stream for streaming future logs. Closes #988 * docs: add subscribe_logs example * fix clippy errors * refactor: use VecDeque and address review --- CHANGELOG.md | 2 ++ ethers-providers/src/provider.rs | 27 ++++++++++++++++++++----- ethers-providers/src/pubsub.rs | 14 ++++++++++++- examples/subscribe_logs.rs | 34 ++++++++++++++++++++++++++++++++ 4 files changed, 71 insertions(+), 6 deletions(-) create mode 100644 examples/subscribe_logs.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index bfe4d21b..b555aa62 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -158,6 +158,8 @@ ### Unreleased +- Load previous logs before subscribing to new logs in case fromBlock is set + [1264](https://github.com/gakonst/ethers-rs/pull/1264) - Add retries to the pending transaction future [1221](https://github.com/gakonst/ethers-rs/pull/1221) - Add support for basic and bearer authentication in http and non-wasm websockets. diff --git a/ethers-providers/src/provider.rs b/ethers-providers/src/provider.rs index 1bdfe200..ef287a53 100644 --- a/ethers-providers/src/provider.rs +++ b/ethers-providers/src/provider.rs @@ -16,9 +16,9 @@ use ethers_core::{ types::{ transaction::{eip2718::TypedTransaction, eip2930::AccessListWithGasUsed}, Address, Block, BlockId, BlockNumber, BlockTrace, Bytes, EIP1186ProofResponse, FeeHistory, - Filter, Log, NameOrAddress, Selector, Signature, Trace, TraceFilter, TraceType, - Transaction, TransactionReceipt, TransactionRequest, TxHash, TxpoolContent, TxpoolInspect, - TxpoolStatus, H256, U256, U64, + Filter, FilterBlockOption, Log, NameOrAddress, Selector, Signature, Trace, TraceFilter, + TraceType, Transaction, TransactionReceipt, TransactionRequest, TxHash, TxpoolContent, + TxpoolInspect, TxpoolStatus, H256, U256, U64, }, utils, }; @@ -28,7 +28,9 @@ use thiserror::Error; use url::{ParseError, Url}; use futures_util::{lock::Mutex, try_join}; -use std::{convert::TryFrom, fmt::Debug, str::FromStr, sync::Arc, time::Duration}; +use std::{ + collections::VecDeque, convert::TryFrom, fmt::Debug, str::FromStr, sync::Arc, time::Duration, +}; use tracing::trace; use tracing_futures::Instrument; @@ -1102,9 +1104,24 @@ impl Middleware for Provider

{ where P: PubsubClient, { + let loaded_logs = match filter.block_option { + FilterBlockOption::Range { from_block, to_block: _ } => { + if from_block.is_none() { + vec![] + } else { + self.get_logs(filter).await? + } + } + FilterBlockOption::AtBlockHash(_block_hash) => self.get_logs(filter).await?, + }; + let loaded_logs = VecDeque::from(loaded_logs); + let logs = utils::serialize(&"logs"); // TODO: Make this a static let filter = utils::serialize(filter); - self.subscribe([logs, filter]).await + self.subscribe([logs, filter]).await.map(|mut stream| { + stream.set_loaded_elements(loaded_logs); + stream + }) } async fn fee_history + Send + Sync>( diff --git a/ethers-providers/src/pubsub.rs b/ethers-providers/src/pubsub.rs index 9aed8252..63339437 100644 --- a/ethers-providers/src/pubsub.rs +++ b/ethers-providers/src/pubsub.rs @@ -7,6 +7,7 @@ use pin_project::{pin_project, pinned_drop}; use serde::de::DeserializeOwned; use serde_json::value::RawValue; use std::{ + collections::VecDeque, marker::PhantomData, pin::Pin, task::{Context, Poll}, @@ -31,6 +32,8 @@ pub struct SubscriptionStream<'a, P: PubsubClient, R: DeserializeOwned> { /// The subscription's installed id on the ethereum node pub id: U256, + loaded_elements: VecDeque, + provider: &'a Provider

, #[pin] @@ -54,13 +57,17 @@ where pub fn new(id: U256, provider: &'a Provider

) -> Result { // Call the underlying PubsubClient's subscribe let rx = provider.as_ref().subscribe(id)?; - Ok(Self { id, provider, rx, ret: PhantomData }) + Ok(Self { id, provider, rx, ret: PhantomData, loaded_elements: VecDeque::new() }) } /// Unsubscribes from the subscription. pub async fn unsubscribe(&self) -> Result { self.provider.unsubscribe(self.id).await } + + pub fn set_loaded_elements(&mut self, loaded_elements: VecDeque) { + self.loaded_elements = loaded_elements; + } } // Each subscription item is a serde_json::Value which must be decoded to the @@ -74,6 +81,11 @@ where type Item = R; fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll> { + if !self.loaded_elements.is_empty() { + let next_element = self.get_mut().loaded_elements.pop_front(); + return Poll::Ready(next_element) + } + let this = self.project(); match futures_util::ready!(this.rx.poll_next(ctx)) { Some(item) => match serde_json::from_str(item.get()) { diff --git a/examples/subscribe_logs.rs b/examples/subscribe_logs.rs new file mode 100644 index 00000000..18216db9 --- /dev/null +++ b/examples/subscribe_logs.rs @@ -0,0 +1,34 @@ +use ethers::{abi::AbiDecode, prelude::*, utils::keccak256}; +use eyre::Result; +use std::sync::Arc; + +#[tokio::main] +async fn main() -> Result<()> { + let client = + Provider::::connect("wss://mainnet.infura.io/ws/v3/c60b0bb42f8a4c6481ecd229eddaca27") + .await?; + let client = Arc::new(client); + + let last_block = client.get_block(BlockNumber::Latest).await?.unwrap().number.unwrap(); + println!("last_block: {}", last_block); + + let erc20_transfer_filter = Filter::new() + .from_block(last_block - 25) + .topic0(ValueOrArray::Value(H256::from(keccak256("Transfer(address,address,uint256)")))); + + let mut stream = client.subscribe_logs(&erc20_transfer_filter).await?; + + while let Some(log) = stream.next().await { + println!( + "block: {:?}, tx: {:?}, token: {:?}, from: {:?}, to: {:?}, amount: {:?}", + log.block_number, + log.transaction_hash, + log.address, + Address::from(log.topics[1]), + Address::from(log.topics[2]), + U256::decode(log.data) + ); + } + + Ok(()) +}