From 9fc142ca615123ace787fbb8c1c65a5718b60f77 Mon Sep 17 00:00:00 2001 From: Georgios Konstantopoulos Date: Fri, 30 Jul 2021 14:01:38 +0300 Subject: [PATCH] feat: stream_with_meta (#354) --- ethers-contract/src/event.rs | 39 ++----------------------------- ethers-contract/src/lib.rs | 4 ++-- ethers-contract/src/log.rs | 36 ++++++++++++++++++++++++++++ ethers-contract/src/stream.rs | 30 ++++++++++++++++++++++++ ethers-contract/tests/contract.rs | 17 +++++++++++++- 5 files changed, 86 insertions(+), 40 deletions(-) diff --git a/ethers-contract/src/event.rs b/ethers-contract/src/event.rs index 38d8bb3d..2d06087c 100644 --- a/ethers-contract/src/event.rs +++ b/ethers-contract/src/event.rs @@ -1,8 +1,8 @@ -use crate::{stream::EventStream, ContractError, EthLogDecode}; +use crate::{log::LogMeta, stream::EventStream, ContractError, EthLogDecode}; use ethers_core::{ abi::{Detokenize, RawLog}, - types::{Address, BlockNumber, Filter, Log, TxHash, ValueOrArray, H256, U256, U64}, + types::{BlockNumber, Filter, Log, ValueOrArray, H256}, }; use ethers_providers::{FilterWatcher, Middleware, PubsubClient, SubscriptionStream}; use std::borrow::Cow; @@ -214,38 +214,3 @@ where .map_err(From::from) } } - -/// Metadata inside a log -#[derive(Clone, Debug, PartialEq)] -pub struct LogMeta { - /// Address from which this log originated - pub address: Address, - - /// The block in which the log was emitted - pub block_number: U64, - - /// The block hash in which the log was emitted - pub block_hash: H256, - - /// The transaction hash in which the log was emitted - pub transaction_hash: TxHash, - - /// Transactions index position log was created from - pub transaction_index: U64, - - /// Log index position in the block - pub log_index: U256, -} - -impl From<&Log> for LogMeta { - fn from(src: &Log) -> Self { - LogMeta { - address: src.address, - block_number: src.block_number.expect("should have a block number"), - block_hash: src.block_hash.expect("should have a block hash"), - transaction_hash: src.transaction_hash.expect("should have a tx hash"), - transaction_index: src.transaction_index.expect("should have a tx index"), - log_index: src.log_index.expect("should have a log index"), - } - } -} diff --git a/ethers-contract/src/lib.rs b/ethers-contract/src/lib.rs index 3d82029c..6d2c544d 100644 --- a/ethers-contract/src/lib.rs +++ b/ethers-contract/src/lib.rs @@ -26,10 +26,10 @@ mod factory; pub use factory::ContractFactory; mod event; -pub use event::{EthEvent, LogMeta}; +pub use event::EthEvent; mod log; -pub use log::{decode_logs, EthLogDecode}; +pub use log::{decode_logs, EthLogDecode, LogMeta}; mod stream; diff --git a/ethers-contract/src/log.rs b/ethers-contract/src/log.rs index cb538649..08e81aaa 100644 --- a/ethers-contract/src/log.rs +++ b/ethers-contract/src/log.rs @@ -1,6 +1,7 @@ //! Mod of types for ethereum logs use ethers_core::abi::Error; use ethers_core::abi::RawLog; +use ethers_core::types::{Address, Log, TxHash, H256, U256, U64}; /// A trait for types (events) that can be decoded from a `RawLog` pub trait EthLogDecode: Send + Sync { @@ -14,3 +15,38 @@ pub trait EthLogDecode: Send + Sync { pub fn decode_logs(logs: &[RawLog]) -> Result, Error> { logs.iter().map(T::decode_log).collect() } + +/// Metadata inside a log +#[derive(Clone, Debug, PartialEq)] +pub struct LogMeta { + /// Address from which this log originated + pub address: Address, + + /// The block in which the log was emitted + pub block_number: U64, + + /// The block hash in which the log was emitted + pub block_hash: H256, + + /// The transaction hash in which the log was emitted + pub transaction_hash: TxHash, + + /// Transactions index position log was created from + pub transaction_index: U64, + + /// Log index position in the block + pub log_index: U256, +} + +impl From<&Log> for LogMeta { + fn from(src: &Log) -> Self { + LogMeta { + address: src.address, + block_number: src.block_number.expect("should have a block number"), + block_hash: src.block_hash.expect("should have a block hash"), + transaction_hash: src.transaction_hash.expect("should have a tx hash"), + transaction_index: src.transaction_index.expect("should have a tx index"), + log_index: src.log_index.expect("should have a log index"), + } + } +} diff --git a/ethers-contract/src/stream.rs b/ethers-contract/src/stream.rs index b0385b0a..79d6bae3 100644 --- a/ethers-contract/src/stream.rs +++ b/ethers-contract/src/stream.rs @@ -1,3 +1,4 @@ +use crate::LogMeta; use ethers_core::types::{Log, U256}; use futures_util::stream::{Stream, StreamExt}; use pin_project::pin_project; @@ -19,6 +20,12 @@ pub struct EventStream<'a, T, R, E> { parse: MapEvent<'a, R, E>, } +impl<'a, T, R, E> EventStream<'a, T, R, E> { + pub fn with_meta(self) -> EventStreamMeta<'a, T, R, E> { + EventStreamMeta(self) + } +} + impl<'a, T, R, E> EventStream<'a, T, R, E> { pub fn new(id: U256, stream: T, parse: MapEvent<'a, R, E>) -> Self { Self { id, stream, parse } @@ -39,3 +46,26 @@ where } } } + +#[pin_project] +pub struct EventStreamMeta<'a, T, R, E>(pub EventStream<'a, T, R, E>); + +impl<'a, T, R, E> Stream for EventStreamMeta<'a, T, R, E> +where + T: Stream + Unpin, +{ + type Item = Result<(R, LogMeta), E>; + + fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll> { + let this = self.project(); + match futures_util::ready!(this.0.stream.poll_next_unpin(ctx)) { + Some(item) => { + let meta = LogMeta::from(&item); + let res = (this.0.parse)(item); + let res = res.map(|inner| (inner, meta)); + Poll::Ready(Some(res)) + } + None => Poll::Pending, + } + } +} diff --git a/ethers-contract/tests/contract.rs b/ethers-contract/tests/contract.rs index baa335fd..773bb868 100644 --- a/ethers-contract/tests/contract.rs +++ b/ethers-contract/tests/contract.rs @@ -278,7 +278,7 @@ mod eth_tests { let (abi, bytecode) = compile_contract("SimpleStorage", "SimpleStorage.sol"); let ganache = Ganache::new().spawn(); let client = connect(&ganache, 0); - let contract = deploy(client, abi.clone(), bytecode).await; + let contract = deploy(client.clone(), abi.clone(), bytecode).await; // We spawn the event listener: let event = contract.event::(); @@ -292,9 +292,13 @@ mod eth_tests { let mut subscription = event2.subscribe().await.unwrap(); assert_eq!(subscription.id, 2.into()); + let mut subscription_meta = event2.subscribe().await.unwrap().with_meta(); + assert_eq!(subscription_meta.0.id, 3.into()); + let num_calls = 3u64; // and we make a few calls + let num = client.get_block_number().await.unwrap(); for i in 0..num_calls { let call = contract .method::<_, H256>("setValue", i.to_string()) @@ -307,8 +311,19 @@ mod eth_tests { // unwrap the option of the stream, then unwrap the decoding result let log = stream.next().await.unwrap().unwrap(); let log2 = subscription.next().await.unwrap().unwrap(); + let (log3, meta) = subscription_meta.next().await.unwrap().unwrap(); + assert_eq!(log.new_value, log3.new_value); assert_eq!(log.new_value, log2.new_value); assert_eq!(log.new_value, i.to_string()); + assert_eq!(meta.block_number, num + i + 1); + let hash = client + .get_block(num + i + 1) + .await + .unwrap() + .unwrap() + .hash + .unwrap(); + assert_eq!(meta.block_hash, hash); } }