`get_paginated_logs` fixes and refactoring (#1328)

* fix(providers): do not load boundry blocks on every page

* refactor(providers): make get_paginated_logs TryStream
This commit is contained in:
Meet Mangukiya 2022-05-31 22:13:03 +05:30 committed by GitHub
parent 8e3529e9b0
commit e0db2f0606
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 42 additions and 25 deletions

View File

@ -1,4 +1,4 @@
use super::{JsonRpcClient, Middleware, PinBoxFut, Provider}; use super::{JsonRpcClient, Middleware, PinBoxFut, Provider, ProviderError};
use ethers_core::types::{Filter, Log, U64}; use ethers_core::types::{Filter, Log, U64};
use futures_core::stream::Stream; use futures_core::stream::Stream;
use std::{ use std::{
@ -6,6 +6,7 @@ use std::{
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
}; };
use thiserror::Error;
pub struct LogQuery<'a, P> { pub struct LogQuery<'a, P> {
provider: &'a Provider<P>, provider: &'a Provider<P>,
@ -55,11 +56,19 @@ macro_rules! rewake_with_new_state {
}; };
} }
#[derive(Error, Debug)]
pub enum LogQueryError<E> {
#[error(transparent)]
LoadLastBlockError(E),
#[error(transparent)]
LoadLogsError(E),
}
impl<'a, P> Stream for LogQuery<'a, P> impl<'a, P> Stream for LogQuery<'a, P>
where where
P: JsonRpcClient, P: JsonRpcClient,
{ {
type Item = Log; type Item = Result<Log, LogQueryError<ProviderError>>;
fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match &mut self.state { match &mut self.state {
@ -77,27 +86,32 @@ where
} }
} }
LogQueryState::LoadLastBlock(fut) => { LogQueryState::LoadLastBlock(fut) => {
self.last_block = Some( match futures_util::ready!(fut.as_mut().poll(ctx)) {
futures_util::ready!(fut.as_mut().poll(ctx)) Ok(last_block) => {
.expect("error occurred loading last block"), self.last_block = Some(last_block);
);
let from_block = self.filter.get_from_block().unwrap(); // this is okay because we will only enter this state when the filter is
let to_block = from_block + self.page_size; // paginatable i.e. from block is set
self.from_block = Some(to_block); let from_block = self.filter.get_from_block().unwrap();
let to_block = from_block + self.page_size;
self.from_block = Some(to_block + 1);
let filter = self.filter.clone().from_block(from_block).to_block(to_block); let filter = self.filter.clone().from_block(from_block).to_block(to_block);
let provider = self.provider; let provider = self.provider;
// load first page of logs // load first page of logs
let fut = Box::pin(async move { provider.get_logs(&filter).await }); let fut = Box::pin(async move { provider.get_logs(&filter).await });
rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut)); rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut));
} }
LogQueryState::LoadLogs(fut) => { Err(err) => Poll::Ready(Some(Err(LogQueryError::LoadLastBlockError(err)))),
let logs = futures_util::ready!(fut.as_mut().poll(ctx)) }
.expect("error occurred loading logs");
self.current_logs = VecDeque::from(logs);
rewake_with_new_state!(ctx, self, LogQueryState::Consume);
} }
LogQueryState::LoadLogs(fut) => match futures_util::ready!(fut.as_mut().poll(ctx)) {
Ok(logs) => {
self.current_logs = VecDeque::from(logs);
rewake_with_new_state!(ctx, self, LogQueryState::Consume);
}
Err(err) => Poll::Ready(Some(Err(LogQueryError::LoadLogsError(err)))),
},
LogQueryState::Consume => { LogQueryState::Consume => {
let log = self.current_logs.pop_front(); let log = self.current_logs.pop_front();
if log.is_none() { if log.is_none() {
@ -106,15 +120,17 @@ where
Poll::Ready(None) Poll::Ready(None)
} else { } else {
// load new logs if there are still more pages to go through // load new logs if there are still more pages to go through
// can safely assume this will always be set in this state
let from_block = self.from_block.unwrap(); let from_block = self.from_block.unwrap();
let to_block = from_block + self.page_size; let to_block = from_block + self.page_size;
// no more pages to load, and everything is consumed // no more pages to load, and everything is consumed
// can safely assume this will always be set in this state
if from_block > self.last_block.unwrap() { if from_block > self.last_block.unwrap() {
return Poll::Ready(None) return Poll::Ready(None)
} }
// load next page // load next page
self.from_block = Some(to_block); self.from_block = Some(to_block + 1);
let filter = self.filter.clone().from_block(from_block).to_block(to_block); let filter = self.filter.clone().from_block(from_block).to_block(to_block);
let provider = self.provider; let provider = self.provider;
@ -122,7 +138,7 @@ where
rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut)); rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut));
} }
} else { } else {
Poll::Ready(log) Poll::Ready(log.map(Ok))
} }
} }
} }

View File

@ -1,10 +1,10 @@
use ethers::{abi::AbiDecode, prelude::*, utils::keccak256}; use ethers::{abi::AbiDecode, prelude::*, providers::Middleware, utils::keccak256};
use eyre::Result; use eyre::Result;
use std::sync::Arc; use std::sync::Arc;
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {
let client = let client: Provider<Ws> =
Provider::<Ws>::connect("wss://mainnet.infura.io/ws/v3/c60b0bb42f8a4c6481ecd229eddaca27") Provider::<Ws>::connect("wss://mainnet.infura.io/ws/v3/c60b0bb42f8a4c6481ecd229eddaca27")
.await?; .await?;
let client = Arc::new(client); let client = Arc::new(client);
@ -18,7 +18,8 @@ async fn main() -> Result<()> {
let mut stream = client.get_logs_paginated(&erc20_transfer_filter, 10); let mut stream = client.get_logs_paginated(&erc20_transfer_filter, 10);
while let Some(log) = stream.next().await { while let Some(res) = stream.next().await {
let log = res?;
println!( println!(
"block: {:?}, tx: {:?}, token: {:?}, from: {:?}, to: {:?}, amount: {:?}", "block: {:?}, tx: {:?}, token: {:?}, from: {:?}, to: {:?}, amount: {:?}",
log.block_number, log.block_number,