2021-07-30 11:01:38 +00:00
|
|
|
use crate::LogMeta;
|
2020-11-30 09:33:06 +00:00
|
|
|
use ethers_core::types::{Log, U256};
|
2021-12-23 19:23:55 +00:00
|
|
|
use futures_util::{
|
|
|
|
future::Either,
|
|
|
|
stream::{Stream, StreamExt},
|
|
|
|
};
|
2020-11-30 09:33:06 +00:00
|
|
|
use pin_project::pin_project;
|
2021-10-29 12:29:35 +00:00
|
|
|
use std::{
|
|
|
|
pin::Pin,
|
|
|
|
task::{Context, Poll},
|
|
|
|
};
|
2020-11-30 09:33:06 +00:00
|
|
|
|
2021-07-13 11:13:12 +00:00
|
|
|
type MapEvent<'a, R, E> = Box<dyn Fn(Log) -> Result<R, E> + 'a + Send + Sync>;
|
2020-11-30 09:33:06 +00:00
|
|
|
|
|
|
|
#[pin_project]
|
|
|
|
/// Generic wrapper around Log streams, mapping their content to a specific
|
|
|
|
/// deserialized log struct.
|
|
|
|
///
|
|
|
|
/// We use this wrapper type instead of `StreamExt::map` in order to preserve
|
|
|
|
/// information about the filter/subscription's id.
|
|
|
|
pub struct EventStream<'a, T, R, E> {
|
|
|
|
pub id: U256,
|
|
|
|
#[pin]
|
|
|
|
stream: T,
|
|
|
|
parse: MapEvent<'a, R, E>,
|
|
|
|
}
|
|
|
|
|
2021-07-30 11:01:38 +00:00
|
|
|
impl<'a, T, R, E> EventStream<'a, T, R, E> {
|
2021-12-23 19:23:55 +00:00
|
|
|
/// Turns this stream of events into a stream that also yields the event's metadata
|
2021-07-30 11:01:38 +00:00
|
|
|
pub fn with_meta(self) -> EventStreamMeta<'a, T, R, E> {
|
|
|
|
EventStreamMeta(self)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-11-30 09:33:06 +00:00
|
|
|
impl<'a, T, R, E> EventStream<'a, T, R, E> {
|
|
|
|
pub fn new(id: U256, stream: T, parse: MapEvent<'a, R, E>) -> Self {
|
|
|
|
Self { id, stream, parse }
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<'a, T, R, E> Stream for EventStream<'a, T, R, E>
|
|
|
|
where
|
|
|
|
T: Stream<Item = Log> + Unpin,
|
|
|
|
{
|
|
|
|
type Item = Result<R, E>;
|
|
|
|
|
|
|
|
fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
|
|
|
|
let mut this = self.project();
|
|
|
|
match futures_util::ready!(this.stream.poll_next_unpin(ctx)) {
|
|
|
|
Some(item) => Poll::Ready(Some((this.parse)(item))),
|
|
|
|
None => Poll::Pending,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2021-07-30 11:01:38 +00:00
|
|
|
|
2021-12-23 19:23:55 +00:00
|
|
|
impl<'a, T, R, E> EventStream<'a, T, R, E>
|
|
|
|
where
|
|
|
|
T: Stream<Item = Log> + Unpin + 'a,
|
|
|
|
R: 'a,
|
|
|
|
E: 'a,
|
|
|
|
{
|
|
|
|
/// This function will attempt to pull events from both event streams. Each
|
|
|
|
/// stream will be polled in a round-robin fashion, and whenever a stream is
|
|
|
|
/// ready to yield an event that event is yielded.
|
|
|
|
///
|
|
|
|
/// After one of the two event streams completes, the remaining one will be
|
|
|
|
/// polled exclusively. The returned stream completes when both input
|
|
|
|
/// streams have completed.
|
|
|
|
///
|
|
|
|
///
|
|
|
|
/// Note that this function consumes both streams and returns a wrapped
|
|
|
|
/// version of them.
|
|
|
|
/// The item of the wrapped stream is an `Either`, and the items that the `self` streams yields
|
|
|
|
/// will be stored in the left-hand variant of that `Either` and the other stream's (`st`) items
|
|
|
|
/// will be wrapped into the right-hand variant of that `Either`.
|
|
|
|
///
|
|
|
|
/// # Example
|
|
|
|
///
|
|
|
|
/// ```
|
2022-07-24 21:41:06 +00:00
|
|
|
/// # #[cfg(feature = "abigen")]
|
2021-12-23 19:23:55 +00:00
|
|
|
/// # async fn test<M:ethers_providers::Middleware>(contract: ethers_contract::Contract<M>) {
|
|
|
|
/// # use ethers_core::types::*;
|
|
|
|
/// # use futures_util::stream::StreamExt;
|
|
|
|
/// # use futures_util::future::Either;
|
|
|
|
/// # use ethers_contract::{Contract, ContractFactory, EthEvent};
|
|
|
|
///
|
|
|
|
/// #[derive(Clone, Debug, EthEvent)]
|
|
|
|
/// pub struct Approval {
|
|
|
|
/// #[ethevent(indexed)]
|
|
|
|
/// pub token_owner: Address,
|
|
|
|
/// #[ethevent(indexed)]
|
|
|
|
/// pub spender: Address,
|
|
|
|
/// pub tokens: U256,
|
|
|
|
/// }
|
|
|
|
///
|
|
|
|
/// #[derive(Clone, Debug, EthEvent)]
|
|
|
|
/// pub struct Transfer {
|
|
|
|
/// #[ethevent(indexed)]
|
|
|
|
/// pub from: Address,
|
|
|
|
/// #[ethevent(indexed)]
|
|
|
|
/// pub to: Address,
|
|
|
|
/// pub tokens: U256,
|
|
|
|
/// }
|
|
|
|
///
|
|
|
|
///
|
|
|
|
/// let ev1 = contract.event::<Approval>().from_block(1337).to_block(2000);
|
|
|
|
/// let ev2 = contract.event::<Transfer>();
|
|
|
|
///
|
|
|
|
/// let mut events = ev1.stream().await.unwrap().select(ev2.stream().await.unwrap()).ok();
|
|
|
|
///
|
|
|
|
/// while let Some(either) = events.next().await {
|
|
|
|
/// match either {
|
|
|
|
/// Either::Left(approval) => { let Approval{token_owner,spender,tokens} = approval; }
|
|
|
|
/// Either::Right(transfer) => { let Transfer{from,to,tokens} = transfer; }
|
|
|
|
/// }
|
|
|
|
/// }
|
|
|
|
///
|
|
|
|
/// # }
|
|
|
|
/// ```
|
|
|
|
pub fn select<St>(self, st: St) -> SelectEvent<SelectEither<'a, Result<R, E>, St::Item>>
|
|
|
|
where
|
|
|
|
St: Stream + Unpin + 'a,
|
|
|
|
{
|
|
|
|
SelectEvent(Box::pin(futures_util::stream::select(
|
|
|
|
self.map(Either::Left),
|
|
|
|
st.map(Either::Right),
|
|
|
|
)))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub type SelectEither<'a, L, R> = Pin<Box<dyn Stream<Item = Either<L, R>> + 'a>>;
|
|
|
|
|
|
|
|
#[pin_project]
|
|
|
|
pub struct SelectEvent<T>(#[pin] T);
|
|
|
|
|
|
|
|
impl<'a, T, L, LE, R, RE> SelectEvent<T>
|
|
|
|
where
|
|
|
|
T: Stream<Item = Either<Result<L, LE>, Result<R, RE>>> + 'a,
|
|
|
|
L: 'a,
|
|
|
|
LE: 'a,
|
|
|
|
R: 'a,
|
|
|
|
RE: 'a,
|
|
|
|
{
|
|
|
|
/// Turns a stream of Results to a stream of `Result::ok` for both arms
|
|
|
|
pub fn ok(self) -> Pin<Box<dyn Stream<Item = Either<L, R>> + 'a>> {
|
|
|
|
Box::pin(self.filter_map(|e| async move {
|
|
|
|
match e {
|
|
|
|
Either::Left(res) => res.ok().map(Either::Left),
|
|
|
|
Either::Right(res) => res.ok().map(Either::Right),
|
|
|
|
}
|
|
|
|
}))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<T: Stream> Stream for SelectEvent<T> {
|
|
|
|
type Item = T::Item;
|
|
|
|
|
|
|
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
|
|
|
let this = self.project();
|
|
|
|
this.0.poll_next(cx)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Wrapper around a `EventStream`, that in addition to the deserialized Event type also yields the
|
|
|
|
/// `LogMeta`.
|
2021-07-30 11:01:38 +00:00
|
|
|
#[pin_project]
|
|
|
|
pub struct EventStreamMeta<'a, T, R, E>(pub EventStream<'a, T, R, E>);
|
|
|
|
|
2021-12-23 19:23:55 +00:00
|
|
|
impl<'a, T, R, E> EventStreamMeta<'a, T, R, E>
|
|
|
|
where
|
|
|
|
T: Stream<Item = Log> + Unpin + 'a,
|
|
|
|
R: 'a,
|
|
|
|
E: 'a,
|
|
|
|
{
|
|
|
|
/// See `EventStream::select`
|
|
|
|
#[allow(clippy::type_complexity)]
|
|
|
|
pub fn select<St>(
|
|
|
|
self,
|
|
|
|
st: St,
|
|
|
|
) -> SelectEvent<SelectEither<'a, Result<(R, LogMeta), E>, St::Item>>
|
|
|
|
where
|
|
|
|
St: Stream + Unpin + 'a,
|
|
|
|
{
|
|
|
|
SelectEvent(Box::pin(futures_util::stream::select(
|
|
|
|
self.map(Either::Left),
|
|
|
|
st.map(Either::Right),
|
|
|
|
)))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-07-30 11:01:38 +00:00
|
|
|
impl<'a, T, R, E> Stream for EventStreamMeta<'a, T, R, E>
|
|
|
|
where
|
|
|
|
T: Stream<Item = Log> + Unpin,
|
|
|
|
{
|
|
|
|
type Item = Result<(R, LogMeta), E>;
|
|
|
|
|
|
|
|
fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
|
|
|
|
let this = self.project();
|
|
|
|
match futures_util::ready!(this.0.stream.poll_next_unpin(ctx)) {
|
|
|
|
Some(item) => {
|
|
|
|
let meta = LogMeta::from(&item);
|
|
|
|
let res = (this.0.parse)(item);
|
|
|
|
let res = res.map(|inner| (inner, meta));
|
|
|
|
Poll::Ready(Some(res))
|
|
|
|
}
|
2021-08-19 07:01:49 +00:00
|
|
|
None => Poll::Ready(None),
|
2021-07-30 11:01:38 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|