feat: stream_with_meta (#354)

This commit is contained in:
Georgios Konstantopoulos 2021-07-30 14:01:38 +03:00 committed by GitHub
parent c8d9f9ba85
commit 9fc142ca61
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 86 additions and 40 deletions

View File

@ -1,8 +1,8 @@
use crate::{stream::EventStream, ContractError, EthLogDecode};
use crate::{log::LogMeta, stream::EventStream, ContractError, EthLogDecode};
use ethers_core::{
abi::{Detokenize, RawLog},
types::{Address, BlockNumber, Filter, Log, TxHash, ValueOrArray, H256, U256, U64},
types::{BlockNumber, Filter, Log, ValueOrArray, H256},
};
use ethers_providers::{FilterWatcher, Middleware, PubsubClient, SubscriptionStream};
use std::borrow::Cow;
@ -214,38 +214,3 @@ where
.map_err(From::from)
}
}
/// Metadata inside a log
#[derive(Clone, Debug, PartialEq)]
pub struct LogMeta {
/// Address from which this log originated
pub address: Address,
/// The block in which the log was emitted
pub block_number: U64,
/// The block hash in which the log was emitted
pub block_hash: H256,
/// The transaction hash in which the log was emitted
pub transaction_hash: TxHash,
/// Transactions index position log was created from
pub transaction_index: U64,
/// Log index position in the block
pub log_index: U256,
}
impl From<&Log> for LogMeta {
fn from(src: &Log) -> Self {
LogMeta {
address: src.address,
block_number: src.block_number.expect("should have a block number"),
block_hash: src.block_hash.expect("should have a block hash"),
transaction_hash: src.transaction_hash.expect("should have a tx hash"),
transaction_index: src.transaction_index.expect("should have a tx index"),
log_index: src.log_index.expect("should have a log index"),
}
}
}

View File

@ -26,10 +26,10 @@ mod factory;
pub use factory::ContractFactory;
mod event;
pub use event::{EthEvent, LogMeta};
pub use event::EthEvent;
mod log;
pub use log::{decode_logs, EthLogDecode};
pub use log::{decode_logs, EthLogDecode, LogMeta};
mod stream;

View File

@ -1,6 +1,7 @@
//! Mod of types for ethereum logs
use ethers_core::abi::Error;
use ethers_core::abi::RawLog;
use ethers_core::types::{Address, Log, TxHash, H256, U256, U64};
/// A trait for types (events) that can be decoded from a `RawLog`
pub trait EthLogDecode: Send + Sync {
@ -14,3 +15,38 @@ pub trait EthLogDecode: Send + Sync {
pub fn decode_logs<T: EthLogDecode>(logs: &[RawLog]) -> Result<Vec<T>, Error> {
logs.iter().map(T::decode_log).collect()
}
/// Metadata inside a log
#[derive(Clone, Debug, PartialEq)]
pub struct LogMeta {
/// Address from which this log originated
pub address: Address,
/// The block in which the log was emitted
pub block_number: U64,
/// The block hash in which the log was emitted
pub block_hash: H256,
/// The transaction hash in which the log was emitted
pub transaction_hash: TxHash,
/// Transactions index position log was created from
pub transaction_index: U64,
/// Log index position in the block
pub log_index: U256,
}
impl From<&Log> for LogMeta {
fn from(src: &Log) -> Self {
LogMeta {
address: src.address,
block_number: src.block_number.expect("should have a block number"),
block_hash: src.block_hash.expect("should have a block hash"),
transaction_hash: src.transaction_hash.expect("should have a tx hash"),
transaction_index: src.transaction_index.expect("should have a tx index"),
log_index: src.log_index.expect("should have a log index"),
}
}
}

View File

@ -1,3 +1,4 @@
use crate::LogMeta;
use ethers_core::types::{Log, U256};
use futures_util::stream::{Stream, StreamExt};
use pin_project::pin_project;
@ -19,6 +20,12 @@ pub struct EventStream<'a, T, R, E> {
parse: MapEvent<'a, R, E>,
}
impl<'a, T, R, E> EventStream<'a, T, R, E> {
pub fn with_meta(self) -> EventStreamMeta<'a, T, R, E> {
EventStreamMeta(self)
}
}
impl<'a, T, R, E> EventStream<'a, T, R, E> {
pub fn new(id: U256, stream: T, parse: MapEvent<'a, R, E>) -> Self {
Self { id, stream, parse }
@ -39,3 +46,26 @@ where
}
}
}
#[pin_project]
pub struct EventStreamMeta<'a, T, R, E>(pub EventStream<'a, T, R, E>);
impl<'a, T, R, E> Stream for EventStreamMeta<'a, T, R, E>
where
T: Stream<Item = Log> + Unpin,
{
type Item = Result<(R, LogMeta), E>;
fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
match futures_util::ready!(this.0.stream.poll_next_unpin(ctx)) {
Some(item) => {
let meta = LogMeta::from(&item);
let res = (this.0.parse)(item);
let res = res.map(|inner| (inner, meta));
Poll::Ready(Some(res))
}
None => Poll::Pending,
}
}
}

View File

@ -278,7 +278,7 @@ mod eth_tests {
let (abi, bytecode) = compile_contract("SimpleStorage", "SimpleStorage.sol");
let ganache = Ganache::new().spawn();
let client = connect(&ganache, 0);
let contract = deploy(client, abi.clone(), bytecode).await;
let contract = deploy(client.clone(), abi.clone(), bytecode).await;
// We spawn the event listener:
let event = contract.event::<ValueChanged>();
@ -292,9 +292,13 @@ mod eth_tests {
let mut subscription = event2.subscribe().await.unwrap();
assert_eq!(subscription.id, 2.into());
let mut subscription_meta = event2.subscribe().await.unwrap().with_meta();
assert_eq!(subscription_meta.0.id, 3.into());
let num_calls = 3u64;
// and we make a few calls
let num = client.get_block_number().await.unwrap();
for i in 0..num_calls {
let call = contract
.method::<_, H256>("setValue", i.to_string())
@ -307,8 +311,19 @@ mod eth_tests {
// unwrap the option of the stream, then unwrap the decoding result
let log = stream.next().await.unwrap().unwrap();
let log2 = subscription.next().await.unwrap().unwrap();
let (log3, meta) = subscription_meta.next().await.unwrap().unwrap();
assert_eq!(log.new_value, log3.new_value);
assert_eq!(log.new_value, log2.new_value);
assert_eq!(log.new_value, i.to_string());
assert_eq!(meta.block_number, num + i + 1);
let hash = client
.get_block(num + i + 1)
.await
.unwrap()
.unwrap()
.hash
.unwrap();
assert_eq!(meta.block_hash, hash);
}
}