use ethers_core::types::{Log, U256}; use futures_util::stream::{Stream, StreamExt}; use pin_project::pin_project; use std::pin::Pin; use std::task::{Context, Poll}; type MapEvent<'a, R, E> = Box Result + 'a>; #[pin_project] /// Generic wrapper around Log streams, mapping their content to a specific /// deserialized log struct. /// /// We use this wrapper type instead of `StreamExt::map` in order to preserve /// information about the filter/subscription's id. pub struct EventStream<'a, T, R, E> { pub id: U256, #[pin] stream: T, parse: MapEvent<'a, R, E>, } impl<'a, T, R, E> EventStream<'a, T, R, E> { pub fn new(id: U256, stream: T, parse: MapEvent<'a, R, E>) -> Self { Self { id, stream, parse } } } impl<'a, T, R, E> Stream for EventStream<'a, T, R, E> where T: Stream + Unpin, { type Item = Result; fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll> { let mut this = self.project(); match futures_util::ready!(this.stream.poll_next_unpin(ctx)) { Some(item) => Poll::Ready(Some((this.parse)(item))), None => Poll::Pending, } } }