Improve Stream performance (#25)

* perf(provider): remove infinite loop and rely on the runtime to poll the stream

* chore: cargo fmt
This commit is contained in:
Georgios Konstantopoulos 2020-06-17 21:01:20 +03:00 committed by GitHub
parent a558a4237b
commit a9d1be4def
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 42 additions and 49 deletions

View File

@ -104,38 +104,37 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project(); let mut this = self.project();
loop { *this.state = match this.state {
*this.state = match this.state { FilterWatcherState::WaitForInterval => {
FilterWatcherState::WaitForInterval => { // Wait the polling period
// Wait the polling period let mut interval = Box::pin(this.interval.tick());
let mut interval = Box::pin(this.interval.tick()); let _ready = futures_util::ready!(interval.as_mut().poll(cx));
let _ready = futures_util::ready!(interval.as_mut().poll(cx));
// create a new instance of the future // create a new instance of the future
FilterWatcherState::GetFilterChanges(this.factory.as_mut().new()) FilterWatcherState::GetFilterChanges(this.factory.as_mut().new())
} }
FilterWatcherState::GetFilterChanges(fut) => { FilterWatcherState::GetFilterChanges(fut) => {
// wait for the future to be ready // wait for the future to be ready
let mut fut = Box::pin(fut); let mut fut = Box::pin(fut);
// NOTE: If the provider returns an error, this will return an empty // NOTE: If the provider returns an error, this will return an empty
// vector. Should we make this return a Result instead? Ideally if we're // vector. Should we make this return a Result instead? Ideally if we're
// in a streamed loop we wouldn't want the loop to terminate if an error // in a streamed loop we wouldn't want the loop to terminate if an error
// is encountered (since it might be a temporary error). // is encountered (since it might be a temporary error).
let items: Vec<R> = let items: Vec<R> = futures_util::ready!(fut.as_mut().poll(cx)).unwrap_or_default();
futures_util::ready!(fut.as_mut().poll(cx)).unwrap_or_default(); FilterWatcherState::NextItem(items.into_iter())
FilterWatcherState::NextItem(items.into_iter()) }
} // Consume 1 element from the vector. If more elements are in the vector,
// Consume 1 element from the vector. If more elements are in the vector, // the next call will immediately go to this branch instead of trying to get
// the next call will immediately go to this branch instead of trying to get // filter changes again. Once the whole vector is consumed, it will poll again
// filter changes again. Once the whole vector is consumed, it will poll again // for new logs
// for new logs FilterWatcherState::NextItem(iter) => match iter.next() {
FilterWatcherState::NextItem(iter) => match iter.next() { Some(item) => return Poll::Ready(Some(item)),
Some(item) => return Poll::Ready(Some(item)), None => FilterWatcherState::WaitForInterval,
None => FilterWatcherState::WaitForInterval, },
}, };
};
} Poll::Pending
} }
} }

View File

@ -1,19 +0,0 @@
use anyhow::Result;
use ethers::prelude::*;
use std::convert::TryFrom;
#[tokio::main]
async fn main() -> Result<()> {
// connect to the network
let provider = Provider::<Http>::try_from("http://localhost:8545")?;
let filter = Filter::new()
.address_str("f817796F60D268A36a57b8D2dF1B97B14C0D0E1d")?
.event("ValueChanged(address,string,string)") // event name
.topic0("9729a6fbefefc8f6005933898b13dc45c3a2c8b7".parse::<Address>()?); // indexed param
let logs = provider.get_logs(&filter).await?;
println!("Got logs: {}", serde_json::to_string(&logs).unwrap());
Ok(())
}

View File

@ -0,0 +1,13 @@
use ethers::prelude::*;
use std::convert::TryFrom;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let provider = Provider::<Http>::try_from("http://localhost:8545")?;
let mut stream = provider.watch_blocks().await?.interval(2000u64).stream();
while let Some(block) = stream.next().await {
dbg!(block);
}
Ok(())
}