From 73d3d3f494f58599d21f5a2cf3bf863d2e4d5894 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Thu, 30 Jun 2022 00:26:37 +0200 Subject: [PATCH] perf(providers): replace wake_by_ref with loop (#1428) --- ethers-providers/src/stream.rs | 71 +++++++++++++++------------------- 1 file changed, 32 insertions(+), 39 deletions(-) diff --git a/ethers-providers/src/stream.rs b/ethers-providers/src/stream.rs index 0a0eb812..6cd73c7d 100644 --- a/ethers-providers/src/stream.rs +++ b/ethers-providers/src/stream.rs @@ -1,9 +1,7 @@ #![allow(clippy::return_self_not_must_use)] use crate::{JsonRpcClient, Middleware, PinBoxFut, Provider, ProviderError}; - use ethers_core::types::{Transaction, TxHash, U256}; - use futures_core::{stream::Stream, Future}; use futures_util::{stream, stream::FuturesUnordered, FutureExt, StreamExt}; use pin_project::pin_project; @@ -37,8 +35,8 @@ enum FilterWatcherState<'a, R> { } #[must_use = "filters do nothing unless you stream them"] -#[pin_project] /// Streams data from an installed filter via `eth_getFilterChanges` +#[pin_project] pub struct FilterWatcher<'a, P, R> { /// The filter's installed id on the ethereum node pub id: U256, @@ -47,7 +45,7 @@ pub struct FilterWatcher<'a, P, R> { // The polling interval interval: Box + Send + Unpin>, - + /// statemachine driven by the Stream impl state: FilterWatcherState<'a, R>, } @@ -79,8 +77,7 @@ where } } -// Pattern for flattening the returned Vec of filter changes taken from -// https://github.com/tomusdrw/rust-web3/blob/f043b222744580bf4be043da757ab0b300c3b2da/src/api/eth_filter.rs#L50-L67 +// Advances the filter's state machine impl<'a, P, R> Stream for FilterWatcher<'a, P, R> where P: JsonRpcClient, @@ -89,42 +86,38 @@ where type Item = R; fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let this = self.project(); + let mut this = self.project(); let id = *this.id; - *this.state = match this.state { - FilterWatcherState::WaitForInterval => { - // Wait the polling period - let _ready = futures_util::ready!(this.interval.poll_next_unpin(cx)); - - // create a new instance of the future - cx.waker().wake_by_ref(); - let fut = Box::pin(this.provider.get_filter_changes(id)); - FilterWatcherState::GetFilterChanges(fut) - } - FilterWatcherState::GetFilterChanges(fut) => { - // NOTE: If the provider returns an error, this will return an empty - // vector. Should we make this return a Result instead? Ideally if we're - // in a streamed loop we wouldn't want the loop to terminate if an error - // is encountered (since it might be a temporary error). - let items: Vec = futures_util::ready!(fut.as_mut().poll(cx)).unwrap_or_default(); - cx.waker().wake_by_ref(); - FilterWatcherState::NextItem(items.into_iter()) - } - // Consume 1 element from the vector. If more elements are in the vector, - // the next call will immediately go to this branch instead of trying to get - // filter changes again. Once the whole vector is consumed, it will poll again - // for new logs - FilterWatcherState::NextItem(iter) => { - cx.waker().wake_by_ref(); - match iter.next() { - Some(item) => return Poll::Ready(Some(item)), - None => FilterWatcherState::WaitForInterval, + loop { + *this.state = match &mut this.state { + FilterWatcherState::WaitForInterval => { + // Wait the polling period + let _ready = futures_util::ready!(this.interval.poll_next_unpin(cx)); + let fut = Box::pin(this.provider.get_filter_changes(id)); + FilterWatcherState::GetFilterChanges(fut) } - } - }; - - Poll::Pending + FilterWatcherState::GetFilterChanges(fut) => { + // NOTE: If the provider returns an error, this will return an empty + // vector. Should we make this return a Result instead? Ideally if we're + // in a streamed loop we wouldn't want the loop to terminate if an error + // is encountered (since it might be a temporary error). + let items: Vec = + futures_util::ready!(fut.as_mut().poll(cx)).unwrap_or_default(); + FilterWatcherState::NextItem(items.into_iter()) + } + // Consume 1 element from the vector. If more elements are in the vector, + // the next call will immediately go to this branch instead of trying to get + // filter changes again. Once the whole vector is consumed, it will poll again + // for new logs + FilterWatcherState::NextItem(iter) => { + if let item @ Some(_) = iter.next() { + return Poll::Ready(item) + } + FilterWatcherState::WaitForInterval + } + }; + } } }