2020-09-24 21:33:09 +00:00
|
|
|
use crate::{JsonRpcClient, Middleware, PinBoxFut, Provider};
|
2020-06-15 08:46:07 +00:00
|
|
|
|
|
|
|
use ethers_core::types::U256;
|
|
|
|
|
2020-09-23 08:04:54 +00:00
|
|
|
use futures_core::stream::Stream;
|
2020-06-21 07:17:11 +00:00
|
|
|
use futures_timer::Delay;
|
|
|
|
use futures_util::{stream, FutureExt, StreamExt};
|
2020-06-15 08:46:07 +00:00
|
|
|
use pin_project::pin_project;
|
2020-12-24 20:23:05 +00:00
|
|
|
use serde::{de::DeserializeOwned, Serialize};
|
2020-06-15 08:46:07 +00:00
|
|
|
use std::{
|
2020-12-24 20:23:05 +00:00
|
|
|
fmt::Debug,
|
2020-06-15 08:46:07 +00:00
|
|
|
pin::Pin,
|
|
|
|
task::{Context, Poll},
|
|
|
|
time::Duration,
|
|
|
|
vec::IntoIter,
|
|
|
|
};
|
2020-06-21 07:17:11 +00:00
|
|
|
|
|
|
|
// https://github.com/tomusdrw/rust-web3/blob/befcb2fb8f3ca0a43e3081f68886fa327e64c8e6/src/api/eth_filter.rs#L20
|
2020-06-21 08:09:19 +00:00
|
|
|
pub fn interval(duration: Duration) -> impl Stream<Item = ()> + Send + Unpin {
|
2020-06-21 07:17:11 +00:00
|
|
|
stream::unfold((), move |_| Delay::new(duration).map(|_| Some(((), ())))).map(drop)
|
|
|
|
}
|
2020-06-15 08:46:07 +00:00
|
|
|
|
2020-06-22 08:44:08 +00:00
|
|
|
/// The default polling interval for filters and pending transactions
|
|
|
|
pub const DEFAULT_POLL_INTERVAL: Duration = Duration::from_millis(7000);
|
2020-06-15 08:46:07 +00:00
|
|
|
|
2020-09-23 08:04:54 +00:00
|
|
|
enum FilterWatcherState<'a, R> {
|
2020-06-15 08:46:07 +00:00
|
|
|
WaitForInterval,
|
2020-09-23 08:04:54 +00:00
|
|
|
GetFilterChanges(PinBoxFut<'a, Vec<R>>),
|
2020-06-15 08:46:07 +00:00
|
|
|
NextItem(IntoIter<R>),
|
|
|
|
}
|
|
|
|
|
|
|
|
#[must_use = "filters do nothing unless you stream them"]
|
|
|
|
#[pin_project]
|
2021-03-31 08:21:20 +00:00
|
|
|
/// Streams data from an installed filter via `eth_getFilterChanges`
|
2020-09-23 08:04:54 +00:00
|
|
|
pub struct FilterWatcher<'a, P, R> {
|
|
|
|
/// The filter's installed id on the ethereum node
|
|
|
|
pub id: U256,
|
2020-06-15 08:46:07 +00:00
|
|
|
|
2020-09-23 08:04:54 +00:00
|
|
|
provider: &'a Provider<P>,
|
2020-06-15 08:46:07 +00:00
|
|
|
|
|
|
|
// The polling interval
|
2020-06-21 07:17:11 +00:00
|
|
|
interval: Box<dyn Stream<Item = ()> + Send + Unpin>,
|
2020-06-15 08:46:07 +00:00
|
|
|
|
2020-09-23 08:04:54 +00:00
|
|
|
state: FilterWatcherState<'a, R>,
|
2020-06-15 08:46:07 +00:00
|
|
|
}
|
|
|
|
|
2020-09-23 08:04:54 +00:00
|
|
|
impl<'a, P, R> FilterWatcher<'a, P, R>
|
2020-06-15 08:46:07 +00:00
|
|
|
where
|
2020-09-23 08:04:54 +00:00
|
|
|
P: JsonRpcClient,
|
2020-11-30 09:33:06 +00:00
|
|
|
R: Send + Sync + DeserializeOwned,
|
2020-06-15 08:46:07 +00:00
|
|
|
{
|
|
|
|
/// Creates a new watcher with the provided factory and filter id.
|
2020-09-23 08:04:54 +00:00
|
|
|
pub fn new<T: Into<U256>>(id: T, provider: &'a Provider<P>) -> Self {
|
2020-06-15 08:46:07 +00:00
|
|
|
Self {
|
|
|
|
id: id.into(),
|
2020-06-22 08:44:08 +00:00
|
|
|
interval: Box::new(interval(DEFAULT_POLL_INTERVAL)),
|
2020-06-15 08:46:07 +00:00
|
|
|
state: FilterWatcherState::WaitForInterval,
|
2020-09-23 08:04:54 +00:00
|
|
|
provider,
|
2020-06-15 08:46:07 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-09-23 08:04:54 +00:00
|
|
|
/// Sets the stream's polling interval
|
|
|
|
pub fn interval(mut self, duration: Duration) -> Self {
|
2020-06-22 08:44:08 +00:00
|
|
|
self.interval = Box::new(interval(duration));
|
2020-06-15 08:46:07 +00:00
|
|
|
self
|
|
|
|
}
|
2020-09-23 08:04:54 +00:00
|
|
|
|
|
|
|
/// Alias for Box::pin, must be called in order to pin the stream and be able
|
|
|
|
/// to call `next` on it.
|
|
|
|
pub fn stream(self) -> Pin<Box<Self>> {
|
|
|
|
Box::pin(self)
|
|
|
|
}
|
2020-06-15 08:46:07 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Pattern for flattening the returned Vec of filter changes taken from
|
|
|
|
// https://github.com/tomusdrw/rust-web3/blob/f043b222744580bf4be043da757ab0b300c3b2da/src/api/eth_filter.rs#L50-L67
|
2020-09-23 08:04:54 +00:00
|
|
|
impl<'a, P, R> Stream for FilterWatcher<'a, P, R>
|
2020-06-15 08:46:07 +00:00
|
|
|
where
|
2020-09-23 08:04:54 +00:00
|
|
|
P: JsonRpcClient,
|
2020-12-24 20:23:05 +00:00
|
|
|
R: Serialize + Send + Sync + DeserializeOwned + Debug + 'a,
|
2020-06-15 08:46:07 +00:00
|
|
|
{
|
|
|
|
type Item = R;
|
|
|
|
|
2020-09-23 08:04:54 +00:00
|
|
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
|
|
|
let this = self.project();
|
|
|
|
let id = *this.id;
|
2020-06-15 08:46:07 +00:00
|
|
|
|
2020-06-17 18:01:20 +00:00
|
|
|
*this.state = match this.state {
|
|
|
|
FilterWatcherState::WaitForInterval => {
|
|
|
|
// Wait the polling period
|
2020-06-21 07:17:11 +00:00
|
|
|
let _ready = futures_util::ready!(this.interval.poll_next_unpin(cx));
|
2020-06-17 18:01:20 +00:00
|
|
|
|
|
|
|
// create a new instance of the future
|
2020-06-21 07:17:11 +00:00
|
|
|
cx.waker().wake_by_ref();
|
2020-09-23 08:04:54 +00:00
|
|
|
let fut = Box::pin(this.provider.get_filter_changes(id));
|
|
|
|
FilterWatcherState::GetFilterChanges(fut)
|
2020-06-17 18:01:20 +00:00
|
|
|
}
|
|
|
|
FilterWatcherState::GetFilterChanges(fut) => {
|
|
|
|
// NOTE: If the provider returns an error, this will return an empty
|
|
|
|
// vector. Should we make this return a Result instead? Ideally if we're
|
|
|
|
// in a streamed loop we wouldn't want the loop to terminate if an error
|
|
|
|
// is encountered (since it might be a temporary error).
|
2020-09-23 08:04:54 +00:00
|
|
|
let items: Vec<R> = futures_util::ready!(fut.as_mut().poll(cx)).unwrap_or_default();
|
2020-08-31 20:40:49 +00:00
|
|
|
cx.waker().wake_by_ref();
|
2020-06-17 18:01:20 +00:00
|
|
|
FilterWatcherState::NextItem(items.into_iter())
|
|
|
|
}
|
|
|
|
// Consume 1 element from the vector. If more elements are in the vector,
|
|
|
|
// the next call will immediately go to this branch instead of trying to get
|
|
|
|
// filter changes again. Once the whole vector is consumed, it will poll again
|
|
|
|
// for new logs
|
2020-08-31 20:40:49 +00:00
|
|
|
FilterWatcherState::NextItem(iter) => {
|
|
|
|
cx.waker().wake_by_ref();
|
|
|
|
match iter.next() {
|
|
|
|
Some(item) => return Poll::Ready(Some(item)),
|
|
|
|
None => FilterWatcherState::WaitForInterval,
|
2020-06-21 07:17:11 +00:00
|
|
|
}
|
2020-08-31 20:40:49 +00:00
|
|
|
}
|
2020-06-17 18:01:20 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
Poll::Pending
|
2020-06-15 08:46:07 +00:00
|
|
|
}
|
|
|
|
}
|