fix: dont poll stream again if done (#2245)

This commit is contained in:
Matthias Seitz 2023-03-09 22:59:13 +01:00 committed by GitHub
parent ce26dfc0d6
commit 49dccf9d89
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 18 additions and 14 deletions

View File

@ -56,6 +56,8 @@ pub struct TransactionStream<'a, P, St> {
pub(crate) provider: &'a Provider<P>, pub(crate) provider: &'a Provider<P>,
/// A stream of transaction hashes. /// A stream of transaction hashes.
pub(crate) stream: St, pub(crate) stream: St,
/// Marks if the stream is done
stream_done: bool,
/// max allowed futures to execute at once. /// max allowed futures to execute at once.
pub(crate) max_concurrent: usize, pub(crate) max_concurrent: usize,
} }
@ -68,6 +70,7 @@ impl<'a, P: JsonRpcClient, St> TransactionStream<'a, P, St> {
buffered: Default::default(), buffered: Default::default(),
provider, provider,
stream, stream,
stream_done: false,
max_concurrent, max_concurrent,
} }
} }
@ -102,21 +105,22 @@ where
} }
} }
let mut stream_done = false; if !this.stream_done {
loop { loop {
match Stream::poll_next(Pin::new(&mut this.stream), cx) { match Stream::poll_next(Pin::new(&mut this.stream), cx) {
Poll::Ready(Some(tx)) => { Poll::Ready(Some(tx)) => {
if this.pending.len() < this.max_concurrent { if this.pending.len() < this.max_concurrent {
this.push_tx(tx); this.push_tx(tx);
} else { } else {
this.buffered.push_back(tx); this.buffered.push_back(tx);
}
} }
Poll::Ready(None) => {
this.stream_done = true;
break
}
_ => break,
} }
Poll::Ready(None) => {
stream_done = true;
break
}
_ => break,
} }
} }
@ -125,7 +129,7 @@ where
return tx return tx
} }
if stream_done && this.pending.is_empty() { if this.stream_done && this.pending.is_empty() {
// all done // all done
return Poll::Ready(None) return Poll::Ready(None)
} }