Prestwich/super pending (#566)
* feature: pending_escalator * feature: send_escalating in Middleware * bug: don't drop unready futures in escalator.poll * chore: docs and must_use * chores: lints, clippies, wasm fixes, dedup pinboxfut * chore: more lints * refactor: use Delay in polling interval to ensure re-waking * refactor: simplify Sleeping state transition as last will never be None again * bug: properly set last when broadcasts resolve * feature: debug implementation for EscalatingPending * refactor: with_ setters and escalations argument * refactor: use FuturesUnOrdered instead of a vec of futures * chore: update CHANGELOG with recent PR info * chore: update all comments on pending escalator * chore: run rustfmt
This commit is contained in:
parent
e9d40120a7
commit
203b2e8ea3
|
@ -15,6 +15,9 @@
|
|||
- Use rust types as contract function inputs for human readable abi [#482](https://github.com/gakonst/ethers-rs/pull/482)
|
||||
- Add EIP-712 `sign_typed_data` signer method; add ethers-core type `Eip712` trait and derive macro in ethers-derive-eip712 [#481](https://github.com/gakonst/ethers-rs/pull/481)
|
||||
- `LocalWallet::new_keystore` now returns a tuple `(LocalWallet, String)` instead of `LocalWallet`, where the string represents the UUID of the newly created encrypted JSON keystore. The JSON keystore is stored as a file `/dir/uuid`. The issue [#557](https://github.com/gakonst/ethers-rs/issues/557) is addressed [#559](https://github.com/gakonst/ethers-rs/pull/559)
|
||||
- add the missing constructor for `Timelag` middleware via [#568](https://github.com/gakonst/ethers-rs/pull/568)
|
||||
- re-export error types for `Http` and `Ws` providers in [#570](https://github.com/gakonst/ethers-rs/pull/570)
|
||||
- add a method on the `Middleware` to broadcast a tx with a series of escalating gas prices via [#566](https://github.com/gakonst/ethers-rs/pull/566)
|
||||
|
||||
### 0.5.3
|
||||
|
||||
|
|
|
@ -64,6 +64,7 @@
|
|||
//! # }
|
||||
//! ```
|
||||
mod transports;
|
||||
use futures_util::future::join_all;
|
||||
pub use transports::*;
|
||||
|
||||
mod provider;
|
||||
|
@ -74,6 +75,9 @@ pub mod ens;
|
|||
mod pending_transaction;
|
||||
pub use pending_transaction::PendingTransaction;
|
||||
|
||||
mod pending_escalator;
|
||||
pub use pending_escalator::EscalatingPending;
|
||||
|
||||
mod stream;
|
||||
pub use futures_util::StreamExt;
|
||||
pub use stream::{interval, FilterWatcher, TransactionStream, DEFAULT_POLL_INTERVAL};
|
||||
|
@ -89,6 +93,9 @@ use std::{error::Error, fmt::Debug, future::Future, pin::Pin, str::FromStr};
|
|||
|
||||
pub use provider::{FilterKind, Provider, ProviderError};
|
||||
|
||||
/// A simple gas escalation policy
|
||||
pub type EscalationPolicy = Box<dyn Fn(U256, usize) -> U256 + Send + Sync>;
|
||||
|
||||
// Helper type alias
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
pub(crate) type PinBoxFut<'a, T> = Pin<Box<dyn Future<Output = Result<T, ProviderError>> + 'a>>;
|
||||
|
@ -283,6 +290,45 @@ pub trait Middleware: Sync + Send + Debug {
|
|||
self.inner().send_transaction(tx, block).await.map_err(FromErr::from)
|
||||
}
|
||||
|
||||
/// Send a transaction with a simple escalation policy.
|
||||
///
|
||||
/// `policy` should be a boxed function that maps `original_gas_price`
|
||||
/// and `number_of_previous_escalations` -> `new_gas_price`.
|
||||
///
|
||||
/// e.g. `Box::new(|start, escalation_index| start * 1250.pow(escalations) /
|
||||
/// 1000.pow(escalations))`
|
||||
async fn send_escalating<'a>(
|
||||
&'a self,
|
||||
tx: &TypedTransaction,
|
||||
escalations: usize,
|
||||
policy: EscalationPolicy,
|
||||
) -> Result<EscalatingPending<'a, Self::Provider>, Self::Error> {
|
||||
let mut original = tx.clone();
|
||||
self.fill_transaction(&mut original, None).await?;
|
||||
let gas_price = original.gas_price().expect("filled");
|
||||
let chain_id = self.get_chainid().await?.low_u64();
|
||||
let sign_futs: Vec<_> = (0..escalations)
|
||||
.map(|i| {
|
||||
let new_price = policy(gas_price, i);
|
||||
let mut r = original.clone();
|
||||
r.set_gas_price(new_price);
|
||||
r
|
||||
})
|
||||
.map(|req| async move {
|
||||
self.sign(req.rlp(chain_id), &self.default_sender().unwrap_or_default())
|
||||
.await
|
||||
.map(|sig| req.rlp_signed(chain_id, &sig))
|
||||
})
|
||||
.collect();
|
||||
|
||||
// we reverse for convenience. Ensuring that we can always just
|
||||
// `pop()` the next tx off the back later
|
||||
let mut signed = join_all(sign_futs).await.into_iter().collect::<Result<Vec<_>, _>>()?;
|
||||
signed.reverse();
|
||||
|
||||
Ok(EscalatingPending::new(self.provider(), signed))
|
||||
}
|
||||
|
||||
async fn resolve_name(&self, ens_name: &str) -> Result<Address, Self::Error> {
|
||||
self.inner().resolve_name(ens_name).await.map_err(FromErr::from)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,240 @@
|
|||
use ethers_core::types::{Bytes, TransactionReceipt, H256};
|
||||
use futures_util::{stream::FuturesUnordered, StreamExt};
|
||||
use pin_project::pin_project;
|
||||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::Poll,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use futures_timer::Delay;
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
use wasm_timer::Delay;
|
||||
|
||||
use crate::{JsonRpcClient, Middleware, PendingTransaction, PinBoxFut, Provider, ProviderError};
|
||||
|
||||
/// States for the EscalatingPending future
|
||||
enum EscalatorStates<'a, P> {
|
||||
Initial(PinBoxFut<'a, PendingTransaction<'a, P>>),
|
||||
Sleeping(Pin<Box<Delay>>),
|
||||
BroadcastingNew(PinBoxFut<'a, PendingTransaction<'a, P>>),
|
||||
CheckingReceipts(FuturesUnordered<PinBoxFut<'a, Option<TransactionReceipt>>>),
|
||||
Completed,
|
||||
}
|
||||
|
||||
/// An EscalatingPending is a pending transaction that increases its own gas
|
||||
/// price over time, by broadcasting successive versions with higher gas prices.
|
||||
#[must_use]
|
||||
#[pin_project(project = PendingProj)]
|
||||
#[derive(Debug)]
|
||||
pub struct EscalatingPending<'a, P>
|
||||
where
|
||||
P: JsonRpcClient,
|
||||
{
|
||||
provider: &'a Provider<P>,
|
||||
broadcast_interval: Duration,
|
||||
polling_interval: Duration,
|
||||
txns: Vec<Bytes>,
|
||||
last: Instant,
|
||||
sent: Vec<H256>,
|
||||
state: EscalatorStates<'a, P>,
|
||||
}
|
||||
|
||||
impl<'a, P> EscalatingPending<'a, P>
|
||||
where
|
||||
P: JsonRpcClient,
|
||||
{
|
||||
/// Instantiate a new EscalatingPending. This should only be called by the
|
||||
/// Middleware trait.
|
||||
///
|
||||
/// Callers MUST ensure that transactions are in _reverse_ broadcast order
|
||||
/// (this just makes writing the code easier, as we can use `pop()` a lot).
|
||||
///
|
||||
/// TODO: consider deserializing and checking invariants (gas order, etc.)
|
||||
pub(crate) fn new(provider: &'a Provider<P>, mut txns: Vec<Bytes>) -> Self {
|
||||
if txns.is_empty() {
|
||||
panic!("bad args");
|
||||
}
|
||||
|
||||
let first = txns.pop().expect("bad args");
|
||||
// Sane-feeling default intervals
|
||||
Self {
|
||||
provider,
|
||||
broadcast_interval: Duration::from_millis(150),
|
||||
polling_interval: Duration::from_millis(10),
|
||||
txns,
|
||||
// placeholder value. We set this again after the initial broadcast
|
||||
// future resolves
|
||||
last: Instant::now(),
|
||||
sent: vec![],
|
||||
state: EscalatorStates::Initial(Box::pin(provider.send_raw_transaction(first))),
|
||||
}
|
||||
}
|
||||
|
||||
/// Set the broadcast interval. This controls how often the escalator
|
||||
/// broadcasts a new transaction at a higher gas price
|
||||
pub fn with_broadcast_interval(mut self, duration: impl Into<Duration>) -> Self {
|
||||
self.broadcast_interval = duration.into();
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the polling interval. This controls how often the escalator checks
|
||||
/// transaction receipts for confirmation.
|
||||
pub fn with_polling_interval(mut self, duration: impl Into<Duration>) -> Self {
|
||||
self.polling_interval = duration.into();
|
||||
self
|
||||
}
|
||||
|
||||
/// Get the current polling interval.
|
||||
pub fn get_polling_interval(&self) -> Duration {
|
||||
self.polling_interval
|
||||
}
|
||||
|
||||
/// Get the current broadcast interval.
|
||||
pub fn get_broadcast_interval(&self) -> Duration {
|
||||
self.broadcast_interval
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! check_all_receipts {
|
||||
($cx:ident, $this:ident) => {
|
||||
let futs: futures_util::stream::FuturesUnordered<_> = $this
|
||||
.sent
|
||||
.iter()
|
||||
.map(|tx_hash| $this.provider.get_transaction_receipt(*tx_hash))
|
||||
.collect();
|
||||
*$this.state = CheckingReceipts(futs);
|
||||
$cx.waker().wake_by_ref();
|
||||
return Poll::Pending
|
||||
};
|
||||
}
|
||||
|
||||
macro_rules! sleep {
|
||||
($cx:ident, $this:ident) => {
|
||||
*$this.state = EscalatorStates::Sleeping(Box::pin(Delay::new(*$this.polling_interval)));
|
||||
$cx.waker().wake_by_ref();
|
||||
return Poll::Pending
|
||||
};
|
||||
}
|
||||
|
||||
macro_rules! completed {
|
||||
($this:ident, $output:expr) => {
|
||||
*$this.state = Completed;
|
||||
return Poll::Ready($output)
|
||||
};
|
||||
}
|
||||
|
||||
macro_rules! poll_broadcast_fut {
|
||||
($cx:ident, $this:ident, $fut:ident) => {
|
||||
match $fut.as_mut().poll($cx) {
|
||||
Poll::Ready(Ok(pending)) => {
|
||||
*$this.last = Instant::now();
|
||||
$this.sent.push(*pending);
|
||||
tracing::info!(
|
||||
tx_hash = ?*pending,
|
||||
escalation = $this.sent.len(),
|
||||
"Escalation transaction broadcast complete"
|
||||
);
|
||||
check_all_receipts!($cx, $this);
|
||||
}
|
||||
Poll::Ready(Err(e)) => {
|
||||
tracing::error!(
|
||||
error = ?e,
|
||||
"Error during transaction broadcast"
|
||||
);
|
||||
completed!($this, Err(e));
|
||||
}
|
||||
Poll::Pending => return Poll::Pending,
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
impl<'a, P> Future for EscalatingPending<'a, P>
|
||||
where
|
||||
P: JsonRpcClient,
|
||||
{
|
||||
type Output = Result<TransactionReceipt, ProviderError>;
|
||||
|
||||
fn poll(
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Self::Output> {
|
||||
use EscalatorStates::*;
|
||||
|
||||
let this = self.project();
|
||||
|
||||
match this.state {
|
||||
// In the initial state we're simply waiting on the first
|
||||
// transaction braodcast to complete.
|
||||
Initial(fut) => {
|
||||
poll_broadcast_fut!(cx, this, fut);
|
||||
}
|
||||
Sleeping(delay) => {
|
||||
let _ready = futures_util::ready!(delay.as_mut().poll(cx));
|
||||
// if broadcast timer has elapsed and if we have a TX to
|
||||
// broadcast, broadcast it
|
||||
if this.last.elapsed() > *this.broadcast_interval {
|
||||
if let Some(next_to_broadcast) = this.txns.pop() {
|
||||
let fut = this.provider.send_raw_transaction(next_to_broadcast);
|
||||
*this.state = BroadcastingNew(fut);
|
||||
cx.waker().wake_by_ref();
|
||||
return Poll::Pending
|
||||
}
|
||||
}
|
||||
check_all_receipts!(cx, this);
|
||||
}
|
||||
// This state is functionally equivalent to Initial, but we
|
||||
// differentiate it for clarity
|
||||
BroadcastingNew(fut) => {
|
||||
poll_broadcast_fut!(cx, this, fut);
|
||||
}
|
||||
CheckingReceipts(futs) => {
|
||||
// Poll the set of `get_transaction_receipt` futures to check
|
||||
// if any previously-broadcast transaction was confirmed.
|
||||
// Continue doing this until all are resolved
|
||||
match futs.poll_next_unpin(cx) {
|
||||
// We have found a receipt. This means that all other
|
||||
// broadcast txns are now invalid, so we can drop the
|
||||
// futures and complete
|
||||
Poll::Ready(Some(Ok(Some(receipt)))) => {
|
||||
completed!(this, Ok(receipt));
|
||||
}
|
||||
// A `get_transaction_receipt` request resolved, but but we
|
||||
// found no receipt, rewake and check if any other requests
|
||||
// are resolved
|
||||
Poll::Ready(Some(Ok(None))) => cx.waker().wake_by_ref(),
|
||||
// A request errored. We complete the future with the error.
|
||||
Poll::Ready(Some(Err(e))) => {
|
||||
completed!(this, Err(e));
|
||||
}
|
||||
// We have run out of `get_transaction_receipt` requests.
|
||||
// Sleep and then check if we should broadcast again (or
|
||||
// check receipts again)
|
||||
Poll::Ready(None) => {
|
||||
sleep!(cx, this);
|
||||
}
|
||||
// No request has resolved yet. Try again later
|
||||
Poll::Pending => return Poll::Pending,
|
||||
}
|
||||
}
|
||||
Completed => panic!("polled after completion"),
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, P> std::fmt::Debug for EscalatorStates<'a, P> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let state = match self {
|
||||
Self::Initial(_) => "Initial",
|
||||
Self::Sleeping(_) => "Sleeping",
|
||||
Self::BroadcastingNew(_) => "BroadcastingNew",
|
||||
Self::CheckingReceipts(_) => "CheckingReceipts",
|
||||
Self::Completed => "Completed",
|
||||
};
|
||||
f.debug_struct("EscalatorStates").field("state", &state).finish()
|
||||
}
|
||||
}
|
|
@ -10,6 +10,7 @@ use crate::{
|
|||
use crate::CeloMiddleware;
|
||||
use crate::Middleware;
|
||||
use async_trait::async_trait;
|
||||
|
||||
use ethers_core::{
|
||||
abi::{self, Detokenize, ParamType},
|
||||
types::{
|
||||
|
|
Loading…
Reference in New Issue