diff --git a/ethers-providers/src/log_query.rs b/ethers-providers/src/log_query.rs index 77cdaee3..43cf7086 100644 --- a/ethers-providers/src/log_query.rs +++ b/ethers-providers/src/log_query.rs @@ -1,4 +1,4 @@ -use super::{JsonRpcClient, Middleware, PinBoxFut, Provider}; +use super::{JsonRpcClient, Middleware, PinBoxFut, Provider, ProviderError}; use ethers_core::types::{Filter, Log, U64}; use futures_core::stream::Stream; use std::{ @@ -6,6 +6,7 @@ use std::{ pin::Pin, task::{Context, Poll}, }; +use thiserror::Error; pub struct LogQuery<'a, P> { provider: &'a Provider

, @@ -55,11 +56,19 @@ macro_rules! rewake_with_new_state { }; } +#[derive(Error, Debug)] +pub enum LogQueryError { + #[error(transparent)] + LoadLastBlockError(E), + #[error(transparent)] + LoadLogsError(E), +} + impl<'a, P> Stream for LogQuery<'a, P> where P: JsonRpcClient, { - type Item = Log; + type Item = Result>; fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { match &mut self.state { @@ -77,27 +86,32 @@ where } } LogQueryState::LoadLastBlock(fut) => { - self.last_block = Some( - futures_util::ready!(fut.as_mut().poll(ctx)) - .expect("error occurred loading last block"), - ); + match futures_util::ready!(fut.as_mut().poll(ctx)) { + Ok(last_block) => { + self.last_block = Some(last_block); - let from_block = self.filter.get_from_block().unwrap(); - let to_block = from_block + self.page_size; - self.from_block = Some(to_block); + // this is okay because we will only enter this state when the filter is + // paginatable i.e. from block is set + let from_block = self.filter.get_from_block().unwrap(); + let to_block = from_block + self.page_size; + self.from_block = Some(to_block + 1); - let filter = self.filter.clone().from_block(from_block).to_block(to_block); - let provider = self.provider; - // load first page of logs - let fut = Box::pin(async move { provider.get_logs(&filter).await }); - rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut)); - } - LogQueryState::LoadLogs(fut) => { - let logs = futures_util::ready!(fut.as_mut().poll(ctx)) - .expect("error occurred loading logs"); - self.current_logs = VecDeque::from(logs); - rewake_with_new_state!(ctx, self, LogQueryState::Consume); + let filter = self.filter.clone().from_block(from_block).to_block(to_block); + let provider = self.provider; + // load first page of logs + let fut = Box::pin(async move { provider.get_logs(&filter).await }); + rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut)); + } + Err(err) => Poll::Ready(Some(Err(LogQueryError::LoadLastBlockError(err)))), + } } + LogQueryState::LoadLogs(fut) => match futures_util::ready!(fut.as_mut().poll(ctx)) { + Ok(logs) => { + self.current_logs = VecDeque::from(logs); + rewake_with_new_state!(ctx, self, LogQueryState::Consume); + } + Err(err) => Poll::Ready(Some(Err(LogQueryError::LoadLogsError(err)))), + }, LogQueryState::Consume => { let log = self.current_logs.pop_front(); if log.is_none() { @@ -106,15 +120,17 @@ where Poll::Ready(None) } else { // load new logs if there are still more pages to go through + // can safely assume this will always be set in this state let from_block = self.from_block.unwrap(); let to_block = from_block + self.page_size; // no more pages to load, and everything is consumed + // can safely assume this will always be set in this state if from_block > self.last_block.unwrap() { return Poll::Ready(None) } // load next page - self.from_block = Some(to_block); + self.from_block = Some(to_block + 1); let filter = self.filter.clone().from_block(from_block).to_block(to_block); let provider = self.provider; @@ -122,7 +138,7 @@ where rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut)); } } else { - Poll::Ready(log) + Poll::Ready(log.map(Ok)) } } } diff --git a/examples/paginated_logs.rs b/examples/paginated_logs.rs index f911fecf..689d80ad 100644 --- a/examples/paginated_logs.rs +++ b/examples/paginated_logs.rs @@ -1,10 +1,10 @@ -use ethers::{abi::AbiDecode, prelude::*, utils::keccak256}; +use ethers::{abi::AbiDecode, prelude::*, providers::Middleware, utils::keccak256}; use eyre::Result; use std::sync::Arc; #[tokio::main] async fn main() -> Result<()> { - let client = + let client: Provider = Provider::::connect("wss://mainnet.infura.io/ws/v3/c60b0bb42f8a4c6481ecd229eddaca27") .await?; let client = Arc::new(client); @@ -18,7 +18,8 @@ async fn main() -> Result<()> { let mut stream = client.get_logs_paginated(&erc20_transfer_filter, 10); - while let Some(log) = stream.next().await { + while let Some(res) = stream.next().await { + let log = res?; println!( "block: {:?}, tx: {:?}, token: {:?}, from: {:?}, to: {:?}, amount: {:?}", log.block_number,