feat(providers): load previous logs before subscribing (#1264)

* feat(providers): load previous logs before subscribing

Load previous logs and stream it back to the user before establishing
a new stream for streaming future logs.

Closes #988

* docs: add subscribe_logs example

* fix clippy errors

* refactor: use VecDeque and address review
This commit is contained in:
Meet Mangukiya 2022-05-16 20:41:25 +05:30 committed by GitHub
parent 3df1527cef
commit b15d0f8643
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 71 additions and 6 deletions

View File

@ -158,6 +158,8 @@
### Unreleased ### Unreleased
- Load previous logs before subscribing to new logs in case fromBlock is set
[1264](https://github.com/gakonst/ethers-rs/pull/1264)
- Add retries to the pending transaction future - Add retries to the pending transaction future
[1221](https://github.com/gakonst/ethers-rs/pull/1221) [1221](https://github.com/gakonst/ethers-rs/pull/1221)
- Add support for basic and bearer authentication in http and non-wasm websockets. - Add support for basic and bearer authentication in http and non-wasm websockets.

View File

@ -16,9 +16,9 @@ use ethers_core::{
types::{ types::{
transaction::{eip2718::TypedTransaction, eip2930::AccessListWithGasUsed}, transaction::{eip2718::TypedTransaction, eip2930::AccessListWithGasUsed},
Address, Block, BlockId, BlockNumber, BlockTrace, Bytes, EIP1186ProofResponse, FeeHistory, Address, Block, BlockId, BlockNumber, BlockTrace, Bytes, EIP1186ProofResponse, FeeHistory,
Filter, Log, NameOrAddress, Selector, Signature, Trace, TraceFilter, TraceType, Filter, FilterBlockOption, Log, NameOrAddress, Selector, Signature, Trace, TraceFilter,
Transaction, TransactionReceipt, TransactionRequest, TxHash, TxpoolContent, TxpoolInspect, TraceType, Transaction, TransactionReceipt, TransactionRequest, TxHash, TxpoolContent,
TxpoolStatus, H256, U256, U64, TxpoolInspect, TxpoolStatus, H256, U256, U64,
}, },
utils, utils,
}; };
@ -28,7 +28,9 @@ use thiserror::Error;
use url::{ParseError, Url}; use url::{ParseError, Url};
use futures_util::{lock::Mutex, try_join}; use futures_util::{lock::Mutex, try_join};
use std::{convert::TryFrom, fmt::Debug, str::FromStr, sync::Arc, time::Duration}; use std::{
collections::VecDeque, convert::TryFrom, fmt::Debug, str::FromStr, sync::Arc, time::Duration,
};
use tracing::trace; use tracing::trace;
use tracing_futures::Instrument; use tracing_futures::Instrument;
@ -1102,9 +1104,24 @@ impl<P: JsonRpcClient> Middleware for Provider<P> {
where where
P: PubsubClient, P: PubsubClient,
{ {
let loaded_logs = match filter.block_option {
FilterBlockOption::Range { from_block, to_block: _ } => {
if from_block.is_none() {
vec![]
} else {
self.get_logs(filter).await?
}
}
FilterBlockOption::AtBlockHash(_block_hash) => self.get_logs(filter).await?,
};
let loaded_logs = VecDeque::from(loaded_logs);
let logs = utils::serialize(&"logs"); // TODO: Make this a static let logs = utils::serialize(&"logs"); // TODO: Make this a static
let filter = utils::serialize(filter); let filter = utils::serialize(filter);
self.subscribe([logs, filter]).await self.subscribe([logs, filter]).await.map(|mut stream| {
stream.set_loaded_elements(loaded_logs);
stream
})
} }
async fn fee_history<T: Into<U256> + Send + Sync>( async fn fee_history<T: Into<U256> + Send + Sync>(

View File

@ -7,6 +7,7 @@ use pin_project::{pin_project, pinned_drop};
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde_json::value::RawValue; use serde_json::value::RawValue;
use std::{ use std::{
collections::VecDeque,
marker::PhantomData, marker::PhantomData,
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
@ -31,6 +32,8 @@ pub struct SubscriptionStream<'a, P: PubsubClient, R: DeserializeOwned> {
/// The subscription's installed id on the ethereum node /// The subscription's installed id on the ethereum node
pub id: U256, pub id: U256,
loaded_elements: VecDeque<R>,
provider: &'a Provider<P>, provider: &'a Provider<P>,
#[pin] #[pin]
@ -54,13 +57,17 @@ where
pub fn new(id: U256, provider: &'a Provider<P>) -> Result<Self, P::Error> { pub fn new(id: U256, provider: &'a Provider<P>) -> Result<Self, P::Error> {
// Call the underlying PubsubClient's subscribe // Call the underlying PubsubClient's subscribe
let rx = provider.as_ref().subscribe(id)?; let rx = provider.as_ref().subscribe(id)?;
Ok(Self { id, provider, rx, ret: PhantomData }) Ok(Self { id, provider, rx, ret: PhantomData, loaded_elements: VecDeque::new() })
} }
/// Unsubscribes from the subscription. /// Unsubscribes from the subscription.
pub async fn unsubscribe(&self) -> Result<bool, crate::ProviderError> { pub async fn unsubscribe(&self) -> Result<bool, crate::ProviderError> {
self.provider.unsubscribe(self.id).await self.provider.unsubscribe(self.id).await
} }
pub fn set_loaded_elements(&mut self, loaded_elements: VecDeque<R>) {
self.loaded_elements = loaded_elements;
}
} }
// Each subscription item is a serde_json::Value which must be decoded to the // Each subscription item is a serde_json::Value which must be decoded to the
@ -74,6 +81,11 @@ where
type Item = R; type Item = R;
fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
if !self.loaded_elements.is_empty() {
let next_element = self.get_mut().loaded_elements.pop_front();
return Poll::Ready(next_element)
}
let this = self.project(); let this = self.project();
match futures_util::ready!(this.rx.poll_next(ctx)) { match futures_util::ready!(this.rx.poll_next(ctx)) {
Some(item) => match serde_json::from_str(item.get()) { Some(item) => match serde_json::from_str(item.get()) {

View File

@ -0,0 +1,34 @@
use ethers::{abi::AbiDecode, prelude::*, utils::keccak256};
use eyre::Result;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<()> {
let client =
Provider::<Ws>::connect("wss://mainnet.infura.io/ws/v3/c60b0bb42f8a4c6481ecd229eddaca27")
.await?;
let client = Arc::new(client);
let last_block = client.get_block(BlockNumber::Latest).await?.unwrap().number.unwrap();
println!("last_block: {}", last_block);
let erc20_transfer_filter = Filter::new()
.from_block(last_block - 25)
.topic0(ValueOrArray::Value(H256::from(keccak256("Transfer(address,address,uint256)"))));
let mut stream = client.subscribe_logs(&erc20_transfer_filter).await?;
while let Some(log) = stream.next().await {
println!(
"block: {:?}, tx: {:?}, token: {:?}, from: {:?}, to: {:?}, amount: {:?}",
log.block_number,
log.transaction_hash,
log.address,
Address::from(log.topics[1]),
Address::from(log.topics[2]),
U256::decode(log.data)
);
}
Ok(())
}