From 49dccf9d89be7bf13fe091c267b4eee7bc018d24 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Thu, 9 Mar 2023 22:59:13 +0100 Subject: [PATCH] fix: dont poll stream again if done (#2245) --- ethers-providers/src/stream/tx_stream.rs | 32 +++++++++++++----------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/ethers-providers/src/stream/tx_stream.rs b/ethers-providers/src/stream/tx_stream.rs index b4fb723e..5ec54334 100644 --- a/ethers-providers/src/stream/tx_stream.rs +++ b/ethers-providers/src/stream/tx_stream.rs @@ -56,6 +56,8 @@ pub struct TransactionStream<'a, P, St> { pub(crate) provider: &'a Provider

, /// A stream of transaction hashes. pub(crate) stream: St, + /// Marks if the stream is done + stream_done: bool, /// max allowed futures to execute at once. pub(crate) max_concurrent: usize, } @@ -68,6 +70,7 @@ impl<'a, P: JsonRpcClient, St> TransactionStream<'a, P, St> { buffered: Default::default(), provider, stream, + stream_done: false, max_concurrent, } } @@ -102,21 +105,22 @@ where } } - let mut stream_done = false; - loop { - match Stream::poll_next(Pin::new(&mut this.stream), cx) { - Poll::Ready(Some(tx)) => { - if this.pending.len() < this.max_concurrent { - this.push_tx(tx); - } else { - this.buffered.push_back(tx); + if !this.stream_done { + loop { + match Stream::poll_next(Pin::new(&mut this.stream), cx) { + Poll::Ready(Some(tx)) => { + if this.pending.len() < this.max_concurrent { + this.push_tx(tx); + } else { + this.buffered.push_back(tx); + } } + Poll::Ready(None) => { + this.stream_done = true; + break + } + _ => break, } - Poll::Ready(None) => { - stream_done = true; - break - } - _ => break, } } @@ -125,7 +129,7 @@ where return tx } - if stream_done && this.pending.is_empty() { + if this.stream_done && this.pending.is_empty() { // all done return Poll::Ready(None) }