diff --git a/CHANGELOG.md b/CHANGELOG.md index 5d8daf6a..28a1c459 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -164,6 +164,8 @@ ### Unreleased +- Stream of paginated logs that load logs in small pages + [1285](https://github.com/gakonst/ethers-rs/pull/1285) - Load previous logs before subscribing to new logs in case fromBlock is set [1264](https://github.com/gakonst/ethers-rs/pull/1264) - Add retries to the pending transaction future diff --git a/ethers-core/src/types/log.rs b/ethers-core/src/types/log.rs index 17ce8c74..b570e7ac 100644 --- a/ethers-core/src/types/log.rs +++ b/ethers-core/src/types/log.rs @@ -371,6 +371,19 @@ impl Filter { self.topics[3] = Some(topic.into()); self } + + pub fn is_paginatable(&self) -> bool { + self.get_from_block().is_some() + } + + pub fn get_from_block(&self) -> Option { + match self.block_option { + FilterBlockOption::AtBlockHash(_hash) => None, + FilterBlockOption::Range { from_block, to_block: _ } => { + from_block.map(|block| block.as_number()).unwrap_or(None) + } + } + } } /// Union type for representing a single value or a vector of values inside a filter diff --git a/ethers-providers/src/lib.rs b/ethers-providers/src/lib.rs index 9b7afed0..f82a91f8 100644 --- a/ethers-providers/src/lib.rs +++ b/ethers-providers/src/lib.rs @@ -18,6 +18,9 @@ pub use pending_transaction::PendingTransaction; mod pending_escalator; pub use pending_escalator::EscalatingPending; +mod log_query; +pub use log_query::LogQuery; + mod stream; pub use futures_util::StreamExt; pub use stream::{interval, FilterWatcher, TransactionStream, DEFAULT_POLL_INTERVAL}; @@ -421,6 +424,15 @@ pub trait Middleware: Sync + Send + Debug { self.inner().get_logs(filter).await.map_err(FromErr::from) } + /// Returns a stream of logs are loaded in pages of given page size + fn get_logs_paginated<'a>( + &'a self, + filter: &Filter, + page_size: u64, + ) -> LogQuery<'a, Self::Provider> { + self.inner().get_logs_paginated(filter, page_size) + } + async fn new_filter(&self, filter: FilterKind<'_>) -> Result { self.inner().new_filter(filter).await.map_err(FromErr::from) } diff --git a/ethers-providers/src/log_query.rs b/ethers-providers/src/log_query.rs new file mode 100644 index 00000000..77cdaee3 --- /dev/null +++ b/ethers-providers/src/log_query.rs @@ -0,0 +1,130 @@ +use super::{JsonRpcClient, Middleware, PinBoxFut, Provider}; +use ethers_core::types::{Filter, Log, U64}; +use futures_core::stream::Stream; +use std::{ + collections::VecDeque, + pin::Pin, + task::{Context, Poll}, +}; + +pub struct LogQuery<'a, P> { + provider: &'a Provider

, + filter: Filter, + from_block: Option, + page_size: u64, + current_logs: VecDeque, + last_block: Option, + state: LogQueryState<'a>, +} + +enum LogQueryState<'a> { + Initial, + LoadLastBlock(PinBoxFut<'a, U64>), + LoadLogs(PinBoxFut<'a, Vec>), + Consume, +} + +impl<'a, P> LogQuery<'a, P> +where + P: JsonRpcClient, +{ + pub fn new(provider: &'a Provider

, filter: &Filter) -> Self { + Self { + provider, + filter: filter.clone(), + from_block: filter.get_from_block(), + page_size: 10000, + current_logs: VecDeque::new(), + last_block: None, + state: LogQueryState::Initial, + } + } + + /// set page size for pagination + pub fn with_page_size(mut self, page_size: u64) -> Self { + self.page_size = page_size; + self + } +} + +macro_rules! rewake_with_new_state { + ($ctx:ident, $this:ident, $new_state:expr) => { + $this.state = $new_state; + $ctx.waker().wake_by_ref(); + return Poll::Pending + }; +} + +impl<'a, P> Stream for LogQuery<'a, P> +where + P: JsonRpcClient, +{ + type Item = Log; + + fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { + match &mut self.state { + LogQueryState::Initial => { + if !self.filter.is_paginatable() { + // if not paginatable, load logs and consume + let filter = self.filter.clone(); + let provider = self.provider; + let fut = Box::pin(async move { provider.get_logs(&filter).await }); + rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut)); + } else { + // if paginatable, load last block + let fut = self.provider.get_block_number(); + rewake_with_new_state!(ctx, self, LogQueryState::LoadLastBlock(fut)); + } + } + LogQueryState::LoadLastBlock(fut) => { + self.last_block = Some( + futures_util::ready!(fut.as_mut().poll(ctx)) + .expect("error occurred loading last block"), + ); + + let from_block = self.filter.get_from_block().unwrap(); + let to_block = from_block + self.page_size; + self.from_block = Some(to_block); + + let filter = self.filter.clone().from_block(from_block).to_block(to_block); + let provider = self.provider; + // load first page of logs + let fut = Box::pin(async move { provider.get_logs(&filter).await }); + rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut)); + } + LogQueryState::LoadLogs(fut) => { + let logs = futures_util::ready!(fut.as_mut().poll(ctx)) + .expect("error occurred loading logs"); + self.current_logs = VecDeque::from(logs); + rewake_with_new_state!(ctx, self, LogQueryState::Consume); + } + LogQueryState::Consume => { + let log = self.current_logs.pop_front(); + if log.is_none() { + // consumed all the logs + if !self.filter.is_paginatable() { + Poll::Ready(None) + } else { + // load new logs if there are still more pages to go through + let from_block = self.from_block.unwrap(); + let to_block = from_block + self.page_size; + + // no more pages to load, and everything is consumed + if from_block > self.last_block.unwrap() { + return Poll::Ready(None) + } + // load next page + self.from_block = Some(to_block); + + let filter = self.filter.clone().from_block(from_block).to_block(to_block); + let provider = self.provider; + let fut = Box::pin(async move { provider.get_logs(&filter).await }); + rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut)); + } + } else { + Poll::Ready(log) + } + } + } + } +} diff --git a/ethers-providers/src/provider.rs b/ethers-providers/src/provider.rs index a2d4c48b..a75e7cdf 100644 --- a/ethers-providers/src/provider.rs +++ b/ethers-providers/src/provider.rs @@ -2,7 +2,7 @@ use crate::{ ens, erc, maybe, pubsub::{PubsubClient, SubscriptionStream}, stream::{FilterWatcher, DEFAULT_POLL_INTERVAL}, - FromErr, Http as HttpProvider, JsonRpcClient, JsonRpcClientWrapper, MockProvider, + FromErr, Http as HttpProvider, JsonRpcClient, JsonRpcClientWrapper, LogQuery, MockProvider, PendingTransaction, QuorumProvider, RwClient, SyncingStatus, }; @@ -647,6 +647,10 @@ impl Middleware for Provider

{ self.request("eth_getLogs", [filter]).await } + fn get_logs_paginated<'a>(&'a self, filter: &Filter, page_size: u64) -> LogQuery<'a, P> { + LogQuery::new(self, filter).with_page_size(page_size) + } + /// Streams matching filter logs async fn watch<'a>( &'a self, diff --git a/examples/paginated_logs.rs b/examples/paginated_logs.rs new file mode 100644 index 00000000..f911fecf --- /dev/null +++ b/examples/paginated_logs.rs @@ -0,0 +1,34 @@ +use ethers::{abi::AbiDecode, prelude::*, utils::keccak256}; +use eyre::Result; +use std::sync::Arc; + +#[tokio::main] +async fn main() -> Result<()> { + let client = + Provider::::connect("wss://mainnet.infura.io/ws/v3/c60b0bb42f8a4c6481ecd229eddaca27") + .await?; + let client = Arc::new(client); + + let last_block = client.get_block(BlockNumber::Latest).await?.unwrap().number.unwrap(); + println!("last_block: {}", last_block); + + let erc20_transfer_filter = Filter::new() + .from_block(last_block - 10000) + .topic0(ValueOrArray::Value(H256::from(keccak256("Transfer(address,address,uint256)")))); + + let mut stream = client.get_logs_paginated(&erc20_transfer_filter, 10); + + while let Some(log) = stream.next().await { + println!( + "block: {:?}, tx: {:?}, token: {:?}, from: {:?}, to: {:?}, amount: {:?}", + log.block_number, + log.transaction_hash, + log.address, + log.topics.get(1), + log.topics.get(2), + U256::decode(log.data) + ); + } + + Ok(()) +}