From 6197d8bb1275b1630ffe055815784ac86bb5cbe9 Mon Sep 17 00:00:00 2001 From: Georgios Konstantopoulos Date: Mon, 31 Aug 2020 23:40:49 +0300 Subject: [PATCH] fix(core): serialize filters properly and always rewake (#61) * fix: serialize filters properly * test: add filter log tests * fix(stream): always re-wake --- ethers-core/src/types/chainstate/log.rs | 125 +++++++++++++++++++++--- ethers-providers/src/stream.rs | 13 +-- 2 files changed, 118 insertions(+), 20 deletions(-) diff --git a/ethers-core/src/types/chainstate/log.rs b/ethers-core/src/types/chainstate/log.rs index d827e150..ccd55e66 100644 --- a/ethers-core/src/types/chainstate/log.rs +++ b/ethers-core/src/types/chainstate/log.rs @@ -3,7 +3,7 @@ use crate::{ types::{Address, BlockNumber, Bytes, H256, U256, U64}, utils::keccak256, }; -use serde::{Deserialize, Serialize, Serializer}; +use serde::{ser::SerializeStruct, Deserialize, Serialize, Serializer}; use std::str::FromStr; /// A log produced by a transaction. @@ -64,18 +64,15 @@ pub struct Log { } /// Filter for -#[derive(Default, Debug, PartialEq, Clone, Serialize)] +#[derive(Default, Debug, PartialEq, Clone)] pub struct Filter { /// From Block - #[serde(rename = "fromBlock", skip_serializing_if = "Option::is_none")] pub from_block: Option, /// To Block - #[serde(rename = "toBlock", skip_serializing_if = "Option::is_none")] pub to_block: Option, /// Address - #[serde(skip_serializing_if = "Option::is_none")] // TODO: The spec says that this can also be an array, do we really want to // monitor for the same event for multiple contracts? address: Option
, @@ -86,10 +83,48 @@ pub struct Filter { pub topics: [Option>; 4], /// Limit - #[serde(skip_serializing_if = "Option::is_none")] limit: Option, } +impl Serialize for Filter { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut s = serializer.serialize_struct("Filter", 5)?; + if let Some(ref from_block) = self.from_block { + s.serialize_field("fromBlock", from_block)?; + } + + if let Some(ref to_block) = self.to_block { + s.serialize_field("toBlock", to_block)?; + } + + if let Some(ref address) = self.address { + s.serialize_field("address", address)?; + } + + let mut filtered_topics = Vec::new(); + for i in 0..4 { + if self.topics[i].is_some() { + filtered_topics.push(&self.topics[i]); + } else { + // TODO: This can be optimized + if self.topics[i + 1..].iter().any(|x| x.is_some()) { + filtered_topics.push(&None); + } + } + } + s.serialize_field("topics", &filtered_topics)?; + + if let Some(ref limit) = self.limit { + s.serialize_field("limit", limit)?; + } + + s.end() + } +} + impl Filter { pub fn new() -> Self { Self::default() @@ -205,21 +240,83 @@ where mod tests { use super::*; use crate::utils::serialize; + use serde_json::json; #[test] fn filter_serialization_test() { let t1 = "9729a6fbefefc8f6005933898b13dc45c3a2c8b7" .parse::
() .unwrap(); + let t2 = H256::from([0; 32]); let t3 = U256::from(123); - let filter = Filter::new() - .address_str("f817796F60D268A36a57b8D2dF1B97B14C0D0E1d") - .unwrap() - .event("ValueChanged(address,string,string)") // event name - .topic1(t1) - .topic2(t3); - let ser = serialize(&filter).to_string(); - assert_eq!(ser, "{\"address\":\"0xf817796f60d268a36a57b8d2df1b97b14c0d0e1d\",\"topics\":[\"0xe826f71647b8486f2bae59832124c70792fba044036720a54ec8dacdd5df4fcb\",\"0x0000000000000000000000009729a6fbefefc8f6005933898b13dc45c3a2c8b7\",\"0x000000000000000000000000000000000000000000000000000000000000007b\",null]}"); + let t1_padded = H256::from(t1); + let t3_padded = H256::from({ + let mut x = [0; 32]; + x[31] = 123; + x + }); + + let event = "ValueChanged(address,string,string)"; + let t0 = H256::from(keccak256(event.as_bytes())); + let addr = Address::from_str("f817796F60D268A36a57b8D2dF1B97B14C0D0E1d").unwrap(); + let filter = Filter::new(); + + let ser = serialize(&filter.clone()); + assert_eq!(ser, json!({ "topics": [] })); + + let filter = filter.address(addr); + + let ser = serialize(&filter.clone()); + assert_eq!(ser, json!({"address" : addr, "topics": []})); + + let filter = filter.event(event); + + // 0 + let ser = serialize(&filter.clone()); + assert_eq!(ser, json!({ "address" : addr, "topics": [t0]})); + + // 1 + let ser = serialize(&filter.clone().topic1(t1)); + assert_eq!(ser, json!({ "address" : addr, "topics": [t0, t1_padded]})); + + // 2 + let ser = serialize(&filter.clone().topic2(t2)); + assert_eq!(ser, json!({ "address" : addr, "topics": [t0, null, t2]})); + + // 3 + let ser = serialize(&filter.clone().topic3(t3)); + assert_eq!( + ser, + json!({ "address" : addr, "topics": [t0, null, null, t3_padded]}) + ); + + // 1 & 2 + let ser = serialize(&filter.clone().topic1(t1).topic2(t2)); + assert_eq!( + ser, + json!({ "address" : addr, "topics": [t0, t1_padded, t2]}) + ); + + // 1 & 3 + let ser = serialize(&filter.clone().topic1(t1).topic3(t3)); + assert_eq!( + ser, + json!({ "address" : addr, "topics": [t0, t1_padded, null, t3_padded]}) + ); + + // 2 & 3 + let ser = serialize(&filter.clone().topic2(t2).topic3(t3)); + assert_eq!( + ser, + json!({ "address" : addr, "topics": [t0, null, t2, t3_padded]}) + ); + + // 1 & 2 & 3 + let ser = serialize(&filter.clone().topic1(t1).topic2(t2).topic3(t3)); + assert_eq!( + ser, + json!({ "address" : addr, "topics": [t0, t1_padded, t2, t3_padded]}) + ); } } diff --git a/ethers-providers/src/stream.rs b/ethers-providers/src/stream.rs index 25ac1098..0afad10f 100644 --- a/ethers-providers/src/stream.rs +++ b/ethers-providers/src/stream.rs @@ -125,19 +125,20 @@ where // in a streamed loop we wouldn't want the loop to terminate if an error // is encountered (since it might be a temporary error). let items: Vec = futures_util::ready!(fut.poll_unpin(cx)).unwrap_or_default(); + cx.waker().wake_by_ref(); FilterWatcherState::NextItem(items.into_iter()) } // Consume 1 element from the vector. If more elements are in the vector, // the next call will immediately go to this branch instead of trying to get // filter changes again. Once the whole vector is consumed, it will poll again // for new logs - FilterWatcherState::NextItem(iter) => match iter.next() { - Some(item) => return Poll::Ready(Some(item)), - None => { - cx.waker().wake_by_ref(); - FilterWatcherState::WaitForInterval + FilterWatcherState::NextItem(iter) => { + cx.waker().wake_by_ref(); + match iter.next() { + Some(item) => return Poll::Ready(Some(item)), + None => FilterWatcherState::WaitForInterval, } - }, + } }; Poll::Pending