perf(providers): replace wake_by_ref with loop (#1428)

This commit is contained in:
Matthias Seitz 2022-06-30 00:26:37 +02:00 committed by GitHub
parent 7e85b33167
commit 73d3d3f494
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 32 additions and 39 deletions

View File

@ -1,9 +1,7 @@
#![allow(clippy::return_self_not_must_use)] #![allow(clippy::return_self_not_must_use)]
use crate::{JsonRpcClient, Middleware, PinBoxFut, Provider, ProviderError}; use crate::{JsonRpcClient, Middleware, PinBoxFut, Provider, ProviderError};
use ethers_core::types::{Transaction, TxHash, U256}; use ethers_core::types::{Transaction, TxHash, U256};
use futures_core::{stream::Stream, Future}; use futures_core::{stream::Stream, Future};
use futures_util::{stream, stream::FuturesUnordered, FutureExt, StreamExt}; use futures_util::{stream, stream::FuturesUnordered, FutureExt, StreamExt};
use pin_project::pin_project; use pin_project::pin_project;
@ -37,8 +35,8 @@ enum FilterWatcherState<'a, R> {
} }
#[must_use = "filters do nothing unless you stream them"] #[must_use = "filters do nothing unless you stream them"]
#[pin_project]
/// Streams data from an installed filter via `eth_getFilterChanges` /// Streams data from an installed filter via `eth_getFilterChanges`
#[pin_project]
pub struct FilterWatcher<'a, P, R> { pub struct FilterWatcher<'a, P, R> {
/// The filter's installed id on the ethereum node /// The filter's installed id on the ethereum node
pub id: U256, pub id: U256,
@ -47,7 +45,7 @@ pub struct FilterWatcher<'a, P, R> {
// The polling interval // The polling interval
interval: Box<dyn Stream<Item = ()> + Send + Unpin>, interval: Box<dyn Stream<Item = ()> + Send + Unpin>,
/// statemachine driven by the Stream impl
state: FilterWatcherState<'a, R>, state: FilterWatcherState<'a, R>,
} }
@ -79,8 +77,7 @@ where
} }
} }
// Pattern for flattening the returned Vec of filter changes taken from // Advances the filter's state machine
// https://github.com/tomusdrw/rust-web3/blob/f043b222744580bf4be043da757ab0b300c3b2da/src/api/eth_filter.rs#L50-L67
impl<'a, P, R> Stream for FilterWatcher<'a, P, R> impl<'a, P, R> Stream for FilterWatcher<'a, P, R>
where where
P: JsonRpcClient, P: JsonRpcClient,
@ -89,16 +86,14 @@ where
type Item = R; type Item = R;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project(); let mut this = self.project();
let id = *this.id; let id = *this.id;
*this.state = match this.state { loop {
*this.state = match &mut this.state {
FilterWatcherState::WaitForInterval => { FilterWatcherState::WaitForInterval => {
// Wait the polling period // Wait the polling period
let _ready = futures_util::ready!(this.interval.poll_next_unpin(cx)); let _ready = futures_util::ready!(this.interval.poll_next_unpin(cx));
// create a new instance of the future
cx.waker().wake_by_ref();
let fut = Box::pin(this.provider.get_filter_changes(id)); let fut = Box::pin(this.provider.get_filter_changes(id));
FilterWatcherState::GetFilterChanges(fut) FilterWatcherState::GetFilterChanges(fut)
} }
@ -107,8 +102,8 @@ where
// vector. Should we make this return a Result instead? Ideally if we're // vector. Should we make this return a Result instead? Ideally if we're
// in a streamed loop we wouldn't want the loop to terminate if an error // in a streamed loop we wouldn't want the loop to terminate if an error
// is encountered (since it might be a temporary error). // is encountered (since it might be a temporary error).
let items: Vec<R> = futures_util::ready!(fut.as_mut().poll(cx)).unwrap_or_default(); let items: Vec<R> =
cx.waker().wake_by_ref(); futures_util::ready!(fut.as_mut().poll(cx)).unwrap_or_default();
FilterWatcherState::NextItem(items.into_iter()) FilterWatcherState::NextItem(items.into_iter())
} }
// Consume 1 element from the vector. If more elements are in the vector, // Consume 1 element from the vector. If more elements are in the vector,
@ -116,15 +111,13 @@ where
// filter changes again. Once the whole vector is consumed, it will poll again // filter changes again. Once the whole vector is consumed, it will poll again
// for new logs // for new logs
FilterWatcherState::NextItem(iter) => { FilterWatcherState::NextItem(iter) => {
cx.waker().wake_by_ref(); if let item @ Some(_) = iter.next() {
match iter.next() { return Poll::Ready(item)
Some(item) => return Poll::Ready(Some(item)),
None => FilterWatcherState::WaitForInterval,
} }
FilterWatcherState::WaitForInterval
} }
}; };
}
Poll::Pending
} }
} }