feat: add paginated logs (#1285)
* feat: add paginated logs * docs: add paginated_logs example * remove unpin
This commit is contained in:
parent
324004cfd6
commit
a150666d54
|
@ -164,6 +164,8 @@
|
|||
|
||||
### Unreleased
|
||||
|
||||
- Stream of paginated logs that load logs in small pages
|
||||
[1285](https://github.com/gakonst/ethers-rs/pull/1285)
|
||||
- Load previous logs before subscribing to new logs in case fromBlock is set
|
||||
[1264](https://github.com/gakonst/ethers-rs/pull/1264)
|
||||
- Add retries to the pending transaction future
|
||||
|
|
|
@ -371,6 +371,19 @@ impl Filter {
|
|||
self.topics[3] = Some(topic.into());
|
||||
self
|
||||
}
|
||||
|
||||
pub fn is_paginatable(&self) -> bool {
|
||||
self.get_from_block().is_some()
|
||||
}
|
||||
|
||||
pub fn get_from_block(&self) -> Option<U64> {
|
||||
match self.block_option {
|
||||
FilterBlockOption::AtBlockHash(_hash) => None,
|
||||
FilterBlockOption::Range { from_block, to_block: _ } => {
|
||||
from_block.map(|block| block.as_number()).unwrap_or(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Union type for representing a single value or a vector of values inside a filter
|
||||
|
|
|
@ -18,6 +18,9 @@ pub use pending_transaction::PendingTransaction;
|
|||
mod pending_escalator;
|
||||
pub use pending_escalator::EscalatingPending;
|
||||
|
||||
mod log_query;
|
||||
pub use log_query::LogQuery;
|
||||
|
||||
mod stream;
|
||||
pub use futures_util::StreamExt;
|
||||
pub use stream::{interval, FilterWatcher, TransactionStream, DEFAULT_POLL_INTERVAL};
|
||||
|
@ -421,6 +424,15 @@ pub trait Middleware: Sync + Send + Debug {
|
|||
self.inner().get_logs(filter).await.map_err(FromErr::from)
|
||||
}
|
||||
|
||||
/// Returns a stream of logs are loaded in pages of given page size
|
||||
fn get_logs_paginated<'a>(
|
||||
&'a self,
|
||||
filter: &Filter,
|
||||
page_size: u64,
|
||||
) -> LogQuery<'a, Self::Provider> {
|
||||
self.inner().get_logs_paginated(filter, page_size)
|
||||
}
|
||||
|
||||
async fn new_filter(&self, filter: FilterKind<'_>) -> Result<U256, Self::Error> {
|
||||
self.inner().new_filter(filter).await.map_err(FromErr::from)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,130 @@
|
|||
use super::{JsonRpcClient, Middleware, PinBoxFut, Provider};
|
||||
use ethers_core::types::{Filter, Log, U64};
|
||||
use futures_core::stream::Stream;
|
||||
use std::{
|
||||
collections::VecDeque,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
pub struct LogQuery<'a, P> {
|
||||
provider: &'a Provider<P>,
|
||||
filter: Filter,
|
||||
from_block: Option<U64>,
|
||||
page_size: u64,
|
||||
current_logs: VecDeque<Log>,
|
||||
last_block: Option<U64>,
|
||||
state: LogQueryState<'a>,
|
||||
}
|
||||
|
||||
enum LogQueryState<'a> {
|
||||
Initial,
|
||||
LoadLastBlock(PinBoxFut<'a, U64>),
|
||||
LoadLogs(PinBoxFut<'a, Vec<Log>>),
|
||||
Consume,
|
||||
}
|
||||
|
||||
impl<'a, P> LogQuery<'a, P>
|
||||
where
|
||||
P: JsonRpcClient,
|
||||
{
|
||||
pub fn new(provider: &'a Provider<P>, filter: &Filter) -> Self {
|
||||
Self {
|
||||
provider,
|
||||
filter: filter.clone(),
|
||||
from_block: filter.get_from_block(),
|
||||
page_size: 10000,
|
||||
current_logs: VecDeque::new(),
|
||||
last_block: None,
|
||||
state: LogQueryState::Initial,
|
||||
}
|
||||
}
|
||||
|
||||
/// set page size for pagination
|
||||
pub fn with_page_size(mut self, page_size: u64) -> Self {
|
||||
self.page_size = page_size;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! rewake_with_new_state {
|
||||
($ctx:ident, $this:ident, $new_state:expr) => {
|
||||
$this.state = $new_state;
|
||||
$ctx.waker().wake_by_ref();
|
||||
return Poll::Pending
|
||||
};
|
||||
}
|
||||
|
||||
impl<'a, P> Stream for LogQuery<'a, P>
|
||||
where
|
||||
P: JsonRpcClient,
|
||||
{
|
||||
type Item = Log;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
match &mut self.state {
|
||||
LogQueryState::Initial => {
|
||||
if !self.filter.is_paginatable() {
|
||||
// if not paginatable, load logs and consume
|
||||
let filter = self.filter.clone();
|
||||
let provider = self.provider;
|
||||
let fut = Box::pin(async move { provider.get_logs(&filter).await });
|
||||
rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut));
|
||||
} else {
|
||||
// if paginatable, load last block
|
||||
let fut = self.provider.get_block_number();
|
||||
rewake_with_new_state!(ctx, self, LogQueryState::LoadLastBlock(fut));
|
||||
}
|
||||
}
|
||||
LogQueryState::LoadLastBlock(fut) => {
|
||||
self.last_block = Some(
|
||||
futures_util::ready!(fut.as_mut().poll(ctx))
|
||||
.expect("error occurred loading last block"),
|
||||
);
|
||||
|
||||
let from_block = self.filter.get_from_block().unwrap();
|
||||
let to_block = from_block + self.page_size;
|
||||
self.from_block = Some(to_block);
|
||||
|
||||
let filter = self.filter.clone().from_block(from_block).to_block(to_block);
|
||||
let provider = self.provider;
|
||||
// load first page of logs
|
||||
let fut = Box::pin(async move { provider.get_logs(&filter).await });
|
||||
rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut));
|
||||
}
|
||||
LogQueryState::LoadLogs(fut) => {
|
||||
let logs = futures_util::ready!(fut.as_mut().poll(ctx))
|
||||
.expect("error occurred loading logs");
|
||||
self.current_logs = VecDeque::from(logs);
|
||||
rewake_with_new_state!(ctx, self, LogQueryState::Consume);
|
||||
}
|
||||
LogQueryState::Consume => {
|
||||
let log = self.current_logs.pop_front();
|
||||
if log.is_none() {
|
||||
// consumed all the logs
|
||||
if !self.filter.is_paginatable() {
|
||||
Poll::Ready(None)
|
||||
} else {
|
||||
// load new logs if there are still more pages to go through
|
||||
let from_block = self.from_block.unwrap();
|
||||
let to_block = from_block + self.page_size;
|
||||
|
||||
// no more pages to load, and everything is consumed
|
||||
if from_block > self.last_block.unwrap() {
|
||||
return Poll::Ready(None)
|
||||
}
|
||||
// load next page
|
||||
self.from_block = Some(to_block);
|
||||
|
||||
let filter = self.filter.clone().from_block(from_block).to_block(to_block);
|
||||
let provider = self.provider;
|
||||
let fut = Box::pin(async move { provider.get_logs(&filter).await });
|
||||
rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut));
|
||||
}
|
||||
} else {
|
||||
Poll::Ready(log)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -2,7 +2,7 @@ use crate::{
|
|||
ens, erc, maybe,
|
||||
pubsub::{PubsubClient, SubscriptionStream},
|
||||
stream::{FilterWatcher, DEFAULT_POLL_INTERVAL},
|
||||
FromErr, Http as HttpProvider, JsonRpcClient, JsonRpcClientWrapper, MockProvider,
|
||||
FromErr, Http as HttpProvider, JsonRpcClient, JsonRpcClientWrapper, LogQuery, MockProvider,
|
||||
PendingTransaction, QuorumProvider, RwClient, SyncingStatus,
|
||||
};
|
||||
|
||||
|
@ -647,6 +647,10 @@ impl<P: JsonRpcClient> Middleware for Provider<P> {
|
|||
self.request("eth_getLogs", [filter]).await
|
||||
}
|
||||
|
||||
fn get_logs_paginated<'a>(&'a self, filter: &Filter, page_size: u64) -> LogQuery<'a, P> {
|
||||
LogQuery::new(self, filter).with_page_size(page_size)
|
||||
}
|
||||
|
||||
/// Streams matching filter logs
|
||||
async fn watch<'a>(
|
||||
&'a self,
|
||||
|
|
|
@ -0,0 +1,34 @@
|
|||
use ethers::{abi::AbiDecode, prelude::*, utils::keccak256};
|
||||
use eyre::Result;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
let client =
|
||||
Provider::<Ws>::connect("wss://mainnet.infura.io/ws/v3/c60b0bb42f8a4c6481ecd229eddaca27")
|
||||
.await?;
|
||||
let client = Arc::new(client);
|
||||
|
||||
let last_block = client.get_block(BlockNumber::Latest).await?.unwrap().number.unwrap();
|
||||
println!("last_block: {}", last_block);
|
||||
|
||||
let erc20_transfer_filter = Filter::new()
|
||||
.from_block(last_block - 10000)
|
||||
.topic0(ValueOrArray::Value(H256::from(keccak256("Transfer(address,address,uint256)"))));
|
||||
|
||||
let mut stream = client.get_logs_paginated(&erc20_transfer_filter, 10);
|
||||
|
||||
while let Some(log) = stream.next().await {
|
||||
println!(
|
||||
"block: {:?}, tx: {:?}, token: {:?}, from: {:?}, to: {:?}, amount: {:?}",
|
||||
log.block_number,
|
||||
log.transaction_hash,
|
||||
log.address,
|
||||
log.topics.get(1),
|
||||
log.topics.get(2),
|
||||
U256::decode(log.data)
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
Loading…
Reference in New Issue