fix: replace FilterStream with concrete type (#69)

* fix: replace FilterStream with concrete type

* fix: use PinBoxFut type alias

* ci: fix CI error with ledger
This commit is contained in:
Georgios Konstantopoulos 2020-09-23 11:04:54 +03:00 committed by GitHub
parent 8ff9a894c0
commit bf1d1e098f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 59 additions and 151 deletions

View File

@ -1,6 +1,6 @@
use crate::ContractError; use crate::ContractError;
use ethers_providers::{FilterStream, JsonRpcClient, Provider}; use ethers_providers::{JsonRpcClient, Provider};
use ethers_core::{ use ethers_core::{
abi::{Detokenize, Event as AbiEvent, RawLog}, abi::{Detokenize, Event as AbiEvent, RawLog},

View File

@ -113,17 +113,19 @@ mod pending_transaction;
pub use pending_transaction::PendingTransaction; pub use pending_transaction::PendingTransaction;
mod stream; mod stream;
pub use stream::{FilterStream, DEFAULT_POLL_INTERVAL};
// re-export `StreamExt` so that consumers can call `next()` on the `FilterStream`
// without having to import futures themselves
pub use futures_util::StreamExt; pub use futures_util::StreamExt;
pub use stream::{FilterWatcher, DEFAULT_POLL_INTERVAL};
use async_trait::async_trait; use async_trait::async_trait;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{error::Error, fmt::Debug}; use std::{error::Error, fmt::Debug, future::Future, pin::Pin};
pub use provider::{Provider, ProviderError}; pub use provider::{Provider, ProviderError};
// Helper type alias
pub(crate) type PinBoxFut<'a, T> =
Pin<Box<dyn Future<Output = Result<T, ProviderError>> + 'a + Send>>;
#[async_trait] #[async_trait]
/// Trait which must be implemented by data transports to be used with the Ethereum /// Trait which must be implemented by data transports to be used with the Ethereum
/// JSON-RPC provider. /// JSON-RPC provider.

View File

@ -1,6 +1,6 @@
use crate::{ use crate::{
stream::{interval, DEFAULT_POLL_INTERVAL}, stream::{interval, DEFAULT_POLL_INTERVAL},
JsonRpcClient, Provider, ProviderError, JsonRpcClient, PinBoxFut, Provider, ProviderError,
}; };
use ethers_core::types::{TransactionReceipt, TxHash, U64}; use ethers_core::types::{TransactionReceipt, TxHash, U64};
use futures_core::stream::Stream; use futures_core::stream::Stream;
@ -168,9 +168,6 @@ impl<'a, P> Deref for PendingTransaction<'a, P> {
} }
} }
// Helper type alias
type PinBoxFut<'a, T> = Pin<Box<dyn Future<Output = Result<T, ProviderError>> + 'a + Send>>;
// We box the TransactionReceipts to keep the enum small. // We box the TransactionReceipts to keep the enum small.
enum PendingTxState<'a> { enum PendingTxState<'a> {
/// Waiting for interval to elapse before calling API again /// Waiting for interval to elapse before calling API again

View File

@ -1,6 +1,6 @@
use crate::{ use crate::{
ens, ens,
stream::{FilterStream, FilterWatcher, DEFAULT_POLL_INTERVAL}, stream::{FilterWatcher, DEFAULT_POLL_INTERVAL},
Http as HttpProvider, JsonRpcClient, PendingTransaction, Http as HttpProvider, JsonRpcClient, PendingTransaction,
}; };
@ -321,32 +321,26 @@ impl<P: JsonRpcClient> Provider<P> {
} }
/// Streams matching filter logs /// Streams matching filter logs
pub async fn watch( pub async fn watch(&self, filter: &Filter) -> Result<FilterWatcher<'_, P, Log>, ProviderError> {
&self,
filter: &Filter,
) -> Result<impl FilterStream<Log> + '_, ProviderError> {
let id = self.new_filter(FilterKind::Logs(filter)).await?; let id = self.new_filter(FilterKind::Logs(filter)).await?;
let fut = move || Box::pin(self.get_filter_changes(id)); let filter = FilterWatcher::new(id, self).interval(self.get_interval());
let filter = FilterWatcher::new(id, fut).interval(self.get_interval());
Ok(filter) Ok(filter)
} }
/// Streams new block hashes /// Streams new block hashes
pub async fn watch_blocks(&self) -> Result<impl FilterStream<H256> + '_, ProviderError> { pub async fn watch_blocks(&self) -> Result<FilterWatcher<'_, P, H256>, ProviderError> {
let id = self.new_filter(FilterKind::NewBlocks).await?; let id = self.new_filter(FilterKind::NewBlocks).await?;
let fut = move || Box::pin(self.get_filter_changes(id)); let filter = FilterWatcher::new(id, self).interval(self.get_interval());
let filter = FilterWatcher::new(id, fut).interval(self.get_interval());
Ok(filter) Ok(filter)
} }
/// Streams pending transactions /// Streams pending transactions
pub async fn watch_pending_transactions( pub async fn watch_pending_transactions(
&self, &self,
) -> Result<impl FilterStream<H256> + '_, ProviderError> { ) -> Result<FilterWatcher<'_, P, H256>, ProviderError> {
let id = self.new_filter(FilterKind::PendingTransactions).await?; let id = self.new_filter(FilterKind::PendingTransactions).await?;
let fut = move || Box::pin(self.get_filter_changes(id)); let filter = FilterWatcher::new(id, self).interval(self.get_interval());
let filter = FilterWatcher::new(id, fut).interval(self.get_interval());
Ok(filter) Ok(filter)
} }

View File

@ -1,14 +1,13 @@
use crate::ProviderError; use crate::{JsonRpcClient, PinBoxFut, Provider};
use ethers_core::types::U256; use ethers_core::types::U256;
use futures_core::{stream::Stream, TryFuture}; use futures_core::stream::Stream;
use futures_timer::Delay; use futures_timer::Delay;
use futures_util::{stream, FutureExt, StreamExt}; use futures_util::{stream, FutureExt, StreamExt};
use pin_project::pin_project; use pin_project::pin_project;
use serde::Deserialize; use serde::Deserialize;
use std::{ use std::{
future::Future,
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
time::Duration, time::Duration,
@ -23,92 +22,66 @@ pub fn interval(duration: Duration) -> impl Stream<Item = ()> + Send + Unpin {
/// The default polling interval for filters and pending transactions /// The default polling interval for filters and pending transactions
pub const DEFAULT_POLL_INTERVAL: Duration = Duration::from_millis(7000); pub const DEFAULT_POLL_INTERVAL: Duration = Duration::from_millis(7000);
/// Trait for streaming filters. enum FilterWatcherState<'a, R> {
pub trait FilterStream<R>: StreamExt + Stream<Item = R>
where
R: for<'de> Deserialize<'de>,
{
/// Returns the filter's ID for it to be uninstalled
fn id(&self) -> U256;
/// Sets the stream's polling interval
fn interval(self, duration: Duration) -> Self;
/// Alias for Box::pin, must be called in order to pin the stream and be able
/// to call `next` on it.
fn stream(self) -> Pin<Box<Self>>
where
Self: Sized,
{
Box::pin(self)
}
}
enum FilterWatcherState<F, R> {
WaitForInterval, WaitForInterval,
GetFilterChanges(F), GetFilterChanges(PinBoxFut<'a, Vec<R>>),
NextItem(IntoIter<R>), NextItem(IntoIter<R>),
} }
#[must_use = "filters do nothing unless you stream them"] #[must_use = "filters do nothing unless you stream them"]
#[pin_project] #[pin_project]
pub(crate) struct FilterWatcher<F: FutureFactory, R> { pub struct FilterWatcher<'a, P, R> {
id: U256, /// The filter's installed id on the ethereum node
pub id: U256,
#[pin] provider: &'a Provider<P>,
// Future factory for generating new calls on each loop
factory: F,
// The polling interval // The polling interval
interval: Box<dyn Stream<Item = ()> + Send + Unpin>, interval: Box<dyn Stream<Item = ()> + Send + Unpin>,
state: FilterWatcherState<F::FutureItem, R>, state: FilterWatcherState<'a, R>,
} }
impl<F, R> FilterWatcher<F, R> impl<'a, P, R> FilterWatcher<'a, P, R>
where where
F: FutureFactory, P: JsonRpcClient,
R: for<'de> Deserialize<'de>, R: for<'de> Deserialize<'de>,
{ {
/// Creates a new watcher with the provided factory and filter id. /// Creates a new watcher with the provided factory and filter id.
pub fn new<T: Into<U256>>(id: T, factory: F) -> Self { pub fn new<T: Into<U256>>(id: T, provider: &'a Provider<P>) -> Self {
Self { Self {
id: id.into(), id: id.into(),
interval: Box::new(interval(DEFAULT_POLL_INTERVAL)), interval: Box::new(interval(DEFAULT_POLL_INTERVAL)),
state: FilterWatcherState::WaitForInterval, state: FilterWatcherState::WaitForInterval,
factory, provider,
}
} }
} }
impl<F, R> FilterStream<R> for FilterWatcher<F, R> /// Sets the stream's polling interval
where pub fn interval(mut self, duration: Duration) -> Self {
F: FutureFactory,
F::FutureItem: Future<Output = Result<Vec<R>, ProviderError>>,
R: for<'de> Deserialize<'de>,
{
fn id(&self) -> U256 {
self.id
}
fn interval(mut self, duration: Duration) -> Self {
self.interval = Box::new(interval(duration)); self.interval = Box::new(interval(duration));
self self
} }
/// Alias for Box::pin, must be called in order to pin the stream and be able
/// to call `next` on it.
pub fn stream(self) -> Pin<Box<Self>> {
Box::pin(self)
}
} }
// Pattern for flattening the returned Vec of filter changes taken from // Pattern for flattening the returned Vec of filter changes taken from
// https://github.com/tomusdrw/rust-web3/blob/f043b222744580bf4be043da757ab0b300c3b2da/src/api/eth_filter.rs#L50-L67 // https://github.com/tomusdrw/rust-web3/blob/f043b222744580bf4be043da757ab0b300c3b2da/src/api/eth_filter.rs#L50-L67
impl<F, R> Stream for FilterWatcher<F, R> impl<'a, P, R> Stream for FilterWatcher<'a, P, R>
where where
F: FutureFactory, P: JsonRpcClient,
F::FutureItem: Future<Output = Result<Vec<R>, ProviderError>>, R: for<'de> Deserialize<'de> + 'a,
R: for<'de> Deserialize<'de>,
{ {
type Item = R; type Item = R;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let mut this = self.project(); let this = self.project();
let id = *this.id;
*this.state = match this.state { *this.state = match this.state {
FilterWatcherState::WaitForInterval => { FilterWatcherState::WaitForInterval => {
@ -117,14 +90,15 @@ where
// create a new instance of the future // create a new instance of the future
cx.waker().wake_by_ref(); cx.waker().wake_by_ref();
FilterWatcherState::GetFilterChanges(this.factory.as_mut().new()) let fut = Box::pin(this.provider.get_filter_changes(id));
FilterWatcherState::GetFilterChanges(fut)
} }
FilterWatcherState::GetFilterChanges(fut) => { FilterWatcherState::GetFilterChanges(fut) => {
// NOTE: If the provider returns an error, this will return an empty // NOTE: If the provider returns an error, this will return an empty
// vector. Should we make this return a Result instead? Ideally if we're // vector. Should we make this return a Result instead? Ideally if we're
// in a streamed loop we wouldn't want the loop to terminate if an error // in a streamed loop we wouldn't want the loop to terminate if an error
// is encountered (since it might be a temporary error). // is encountered (since it might be a temporary error).
let items: Vec<R> = futures_util::ready!(fut.poll_unpin(cx)).unwrap_or_default(); let items: Vec<R> = futures_util::ready!(fut.as_mut().poll(cx)).unwrap_or_default();
cx.waker().wake_by_ref(); cx.waker().wake_by_ref();
FilterWatcherState::NextItem(items.into_iter()) FilterWatcherState::NextItem(items.into_iter())
} }
@ -144,66 +118,3 @@ where
Poll::Pending Poll::Pending
} }
} }
// Do not leak private trait
// Pattern for re-usable futures from: https://gitlab.com/Ploppz/futures-retry/-/blob/std-futures/src/future.rs#L13
use factory::FutureFactory;
mod factory {
use super::*;
/// A factory trait used to create futures.
///
/// We need a factory for the stream logic because when (and if) a future
/// is polled to completion, it can't be polled again. Hence we need to
/// create a new one.
///
/// This trait is implemented for any closure that returns a `Future`, so you don't
/// have to write your own type and implement it to handle some simple cases.
pub trait FutureFactory {
/// A future type that is created by the `new` method.
type FutureItem: TryFuture + Unpin;
/// Creates a new future. We don't need the factory to be immutable so we
/// pass `self` as a mutable reference.
fn new(self: Pin<&mut Self>) -> Self::FutureItem;
}
impl<T, F> FutureFactory for T
where
T: Unpin + FnMut() -> F,
F: TryFuture + Unpin,
{
type FutureItem = F;
#[allow(clippy::new_ret_no_self)]
fn new(self: Pin<&mut Self>) -> F {
(*self.get_mut())()
}
}
}
#[cfg(test)]
mod watch {
use super::*;
use futures_util::StreamExt;
#[tokio::test]
async fn stream() {
let factory = || Box::pin(async { Ok::<Vec<u64>, ProviderError>(vec![1, 2, 3]) });
let filter = FilterWatcher::<_, u64>::new(1, factory);
// stream combinator calls are still doable since FilterStream extends
// Stream and StreamExt
let mut stream = filter
.interval(Duration::from_millis(100u64))
.stream()
.map(|x| 2 * x);
assert_eq!(stream.next().await.unwrap(), 2);
assert_eq!(stream.next().await.unwrap(), 4);
assert_eq!(stream.next().await.unwrap(), 6);
// this will poll the factory function again since it consumed the entire
// vector, so it'll wrap around. Realistically, we'd then sleep for a few seconds
// until new blocks are mined, until the call to the factory returns a non-empty
// vector of logs
assert_eq!(stream.next().await.unwrap(), 2);
}
}

View File

@ -70,7 +70,7 @@ mod eth_tests {
#[cfg(feature = "tokio-runtime")] #[cfg(feature = "tokio-runtime")]
async fn watch_blocks_websocket() { async fn watch_blocks_websocket() {
use ethers::{ use ethers::{
providers::{FilterStream, StreamExt, Ws}, providers::{StreamExt, Ws},
types::H256, types::H256,
}; };
@ -154,10 +154,7 @@ mod eth_tests {
#[cfg(feature = "celo")] #[cfg(feature = "celo")]
mod celo_tests { mod celo_tests {
use super::*; use super::*;
use ethers::{ use ethers::types::{Randomness, H256};
providers::FilterStream,
types::{Randomness, H256},
};
use futures_util::stream::StreamExt; use futures_util::stream::StreamExt;
use rustc_hex::FromHex; use rustc_hex::FromHex;

View File

@ -43,7 +43,10 @@ pub use wallet::Wallet;
#[cfg(feature = "ledger")] #[cfg(feature = "ledger")]
mod ledger; mod ledger;
#[cfg(feature = "ledger")] #[cfg(feature = "ledger")]
pub use ledger::{app::LedgerEthereum as Ledger, types::{LedgerError, DerivationType as HDPath}}; pub use ledger::{
app::LedgerEthereum as Ledger,
types::{DerivationType as HDPath, LedgerError},
};
mod nonce_manager; mod nonce_manager;
pub(crate) use nonce_manager::NonceManager; pub(crate) use nonce_manager::NonceManager;

View File

@ -1,8 +1,9 @@
use anyhow::Result;
use ethers::{utils::parse_ether, prelude::*};
#[tokio::main] #[tokio::main]
#[cfg(feature = "ledger")]
async fn main() -> Result<()> { async fn main() -> Result<()> {
use anyhow::Result;
use ethers::{prelude::*, utils::parse_ether};
// Connect over websockets // Connect over websockets
let provider = Provider::new(Ws::connect("ws://localhost:8545").await?); let provider = Provider::new(Ws::connect("ws://localhost:8545").await?);
// Instantiate the connection to ledger with Ledger Live derivation path and // Instantiate the connection to ledger with Ledger Live derivation path and
@ -20,6 +21,9 @@ async fn main() -> Result<()> {
let tx_hash = client.send_transaction(tx, None).await?; let tx_hash = client.send_transaction(tx, None).await?;
// Get the receipt // Get the receipt
let receipt = client.pending_transaction(tx_hash).confirmations(3).await?; let _receipt = client.pending_transaction(tx_hash).confirmations(3).await?;
Ok(()) Ok(())
} }
#[cfg(not(feature = "ledger"))]
fn main() {}