From a9d1be4def673e203fb7c9bd7ffe550dff3c00a8 Mon Sep 17 00:00:00 2001 From: Georgios Konstantopoulos Date: Wed, 17 Jun 2020 21:01:20 +0300 Subject: [PATCH] Improve Stream performance (#25) * perf(provider): remove infinite loop and rely on the runtime to poll the stream * chore: cargo fmt --- ethers-providers/src/stream.rs | 59 ++++++++++++++++----------------- ethers/examples/get_logs.rs | 19 ----------- ethers/examples/watch_blocks.rs | 13 ++++++++ 3 files changed, 42 insertions(+), 49 deletions(-) delete mode 100644 ethers/examples/get_logs.rs create mode 100644 ethers/examples/watch_blocks.rs diff --git a/ethers-providers/src/stream.rs b/ethers-providers/src/stream.rs index 613567ea..1ca82a43 100644 --- a/ethers-providers/src/stream.rs +++ b/ethers-providers/src/stream.rs @@ -104,38 +104,37 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); - loop { - *this.state = match this.state { - FilterWatcherState::WaitForInterval => { - // Wait the polling period - let mut interval = Box::pin(this.interval.tick()); - let _ready = futures_util::ready!(interval.as_mut().poll(cx)); + *this.state = match this.state { + FilterWatcherState::WaitForInterval => { + // Wait the polling period + let mut interval = Box::pin(this.interval.tick()); + let _ready = futures_util::ready!(interval.as_mut().poll(cx)); - // create a new instance of the future - FilterWatcherState::GetFilterChanges(this.factory.as_mut().new()) - } - FilterWatcherState::GetFilterChanges(fut) => { - // wait for the future to be ready - let mut fut = Box::pin(fut); + // create a new instance of the future + FilterWatcherState::GetFilterChanges(this.factory.as_mut().new()) + } + FilterWatcherState::GetFilterChanges(fut) => { + // wait for the future to be ready + let mut fut = Box::pin(fut); - // NOTE: If the provider returns an error, this will return an empty - // vector. Should we make this return a Result instead? Ideally if we're - // in a streamed loop we wouldn't want the loop to terminate if an error - // is encountered (since it might be a temporary error). - let items: Vec = - futures_util::ready!(fut.as_mut().poll(cx)).unwrap_or_default(); - FilterWatcherState::NextItem(items.into_iter()) - } - // Consume 1 element from the vector. If more elements are in the vector, - // the next call will immediately go to this branch instead of trying to get - // filter changes again. Once the whole vector is consumed, it will poll again - // for new logs - FilterWatcherState::NextItem(iter) => match iter.next() { - Some(item) => return Poll::Ready(Some(item)), - None => FilterWatcherState::WaitForInterval, - }, - }; - } + // NOTE: If the provider returns an error, this will return an empty + // vector. Should we make this return a Result instead? Ideally if we're + // in a streamed loop we wouldn't want the loop to terminate if an error + // is encountered (since it might be a temporary error). + let items: Vec = futures_util::ready!(fut.as_mut().poll(cx)).unwrap_or_default(); + FilterWatcherState::NextItem(items.into_iter()) + } + // Consume 1 element from the vector. If more elements are in the vector, + // the next call will immediately go to this branch instead of trying to get + // filter changes again. Once the whole vector is consumed, it will poll again + // for new logs + FilterWatcherState::NextItem(iter) => match iter.next() { + Some(item) => return Poll::Ready(Some(item)), + None => FilterWatcherState::WaitForInterval, + }, + }; + + Poll::Pending } } diff --git a/ethers/examples/get_logs.rs b/ethers/examples/get_logs.rs deleted file mode 100644 index dd9d5108..00000000 --- a/ethers/examples/get_logs.rs +++ /dev/null @@ -1,19 +0,0 @@ -use anyhow::Result; -use ethers::prelude::*; -use std::convert::TryFrom; - -#[tokio::main] -async fn main() -> Result<()> { - // connect to the network - let provider = Provider::::try_from("http://localhost:8545")?; - - let filter = Filter::new() - .address_str("f817796F60D268A36a57b8D2dF1B97B14C0D0E1d")? - .event("ValueChanged(address,string,string)") // event name - .topic0("9729a6fbefefc8f6005933898b13dc45c3a2c8b7".parse::
()?); // indexed param - - let logs = provider.get_logs(&filter).await?; - println!("Got logs: {}", serde_json::to_string(&logs).unwrap()); - - Ok(()) -} diff --git a/ethers/examples/watch_blocks.rs b/ethers/examples/watch_blocks.rs new file mode 100644 index 00000000..25741fb8 --- /dev/null +++ b/ethers/examples/watch_blocks.rs @@ -0,0 +1,13 @@ +use ethers::prelude::*; +use std::convert::TryFrom; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let provider = Provider::::try_from("http://localhost:8545")?; + let mut stream = provider.watch_blocks().await?.interval(2000u64).stream(); + while let Some(block) = stream.next().await { + dbg!(block); + } + + Ok(()) +}