diff --git a/CHANGELOG.md b/CHANGELOG.md index 580f6330..c755aa1b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -119,6 +119,8 @@ ### Unreleased +- Add `EventStream::select` to combine streams with different event types + [#725](https://github.com/gakonst/ethers-rs/pull/725) - Substitute output tuples with rust struct types for function calls [#664](https://github.com/gakonst/ethers-rs/pull/664) - Add AbiType implementation during EthAbiType expansion diff --git a/ethers-contract/src/stream.rs b/ethers-contract/src/stream.rs index e48ecf21..8abc99a6 100644 --- a/ethers-contract/src/stream.rs +++ b/ethers-contract/src/stream.rs @@ -1,6 +1,9 @@ use crate::LogMeta; use ethers_core::types::{Log, U256}; -use futures_util::stream::{Stream, StreamExt}; +use futures_util::{ + future::Either, + stream::{Stream, StreamExt}, +}; use pin_project::pin_project; use std::{ pin::Pin, @@ -23,6 +26,7 @@ pub struct EventStream<'a, T, R, E> { } impl<'a, T, R, E> EventStream<'a, T, R, E> { + /// Turns this stream of events into a stream that also yields the event's metadata pub fn with_meta(self) -> EventStreamMeta<'a, T, R, E> { EventStreamMeta(self) } @@ -49,9 +53,140 @@ where } } +impl<'a, T, R, E> EventStream<'a, T, R, E> +where + T: Stream + Unpin + 'a, + R: 'a, + E: 'a, +{ + /// This function will attempt to pull events from both event streams. Each + /// stream will be polled in a round-robin fashion, and whenever a stream is + /// ready to yield an event that event is yielded. + /// + /// After one of the two event streams completes, the remaining one will be + /// polled exclusively. The returned stream completes when both input + /// streams have completed. + /// + /// + /// Note that this function consumes both streams and returns a wrapped + /// version of them. + /// The item of the wrapped stream is an `Either`, and the items that the `self` streams yields + /// will be stored in the left-hand variant of that `Either` and the other stream's (`st`) items + /// will be wrapped into the right-hand variant of that `Either`. + /// + /// # Example + /// + /// ``` + /// # async fn test(contract: ethers_contract::Contract) { + /// # use ethers_core::types::*; + /// # use futures_util::stream::StreamExt; + /// # use futures_util::future::Either; + /// # use ethers_contract::{Contract, ContractFactory, EthEvent}; + /// + /// #[derive(Clone, Debug, EthEvent)] + /// pub struct Approval { + /// #[ethevent(indexed)] + /// pub token_owner: Address, + /// #[ethevent(indexed)] + /// pub spender: Address, + /// pub tokens: U256, + /// } + /// + /// #[derive(Clone, Debug, EthEvent)] + /// pub struct Transfer { + /// #[ethevent(indexed)] + /// pub from: Address, + /// #[ethevent(indexed)] + /// pub to: Address, + /// pub tokens: U256, + /// } + /// + /// + /// let ev1 = contract.event::().from_block(1337).to_block(2000); + /// let ev2 = contract.event::(); + /// + /// let mut events = ev1.stream().await.unwrap().select(ev2.stream().await.unwrap()).ok(); + /// + /// while let Some(either) = events.next().await { + /// match either { + /// Either::Left(approval) => { let Approval{token_owner,spender,tokens} = approval; } + /// Either::Right(transfer) => { let Transfer{from,to,tokens} = transfer; } + /// } + /// } + /// + /// # } + /// ``` + pub fn select(self, st: St) -> SelectEvent, St::Item>> + where + St: Stream + Unpin + 'a, + { + SelectEvent(Box::pin(futures_util::stream::select( + self.map(Either::Left), + st.map(Either::Right), + ))) + } +} + +pub type SelectEither<'a, L, R> = Pin> + 'a>>; + +#[pin_project] +pub struct SelectEvent(#[pin] T); + +impl<'a, T, L, LE, R, RE> SelectEvent +where + T: Stream, Result>> + 'a, + L: 'a, + LE: 'a, + R: 'a, + RE: 'a, +{ + /// Turns a stream of Results to a stream of `Result::ok` for both arms + pub fn ok(self) -> Pin> + 'a>> { + Box::pin(self.filter_map(|e| async move { + match e { + Either::Left(res) => res.ok().map(Either::Left), + Either::Right(res) => res.ok().map(Either::Right), + } + })) + } +} + +impl Stream for SelectEvent { + type Item = T::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + this.0.poll_next(cx) + } +} + +/// Wrapper around a `EventStream`, that in addition to the deserialized Event type also yields the +/// `LogMeta`. #[pin_project] pub struct EventStreamMeta<'a, T, R, E>(pub EventStream<'a, T, R, E>); +impl<'a, T, R, E> EventStreamMeta<'a, T, R, E> +where + T: Stream + Unpin + 'a, + R: 'a, + E: 'a, +{ + /// See `EventStream::select` + #[allow(clippy::type_complexity)] + pub fn select( + self, + st: St, + ) -> SelectEvent, St::Item>> + where + St: Stream + Unpin + 'a, + { + SelectEvent(Box::pin(futures_util::stream::select( + self.map(Either::Left), + st.map(Either::Right), + ))) + } +} + impl<'a, T, R, E> Stream for EventStreamMeta<'a, T, R, E> where T: Stream + Unpin,