fix(core): serialize filters properly and always rewake (#61)

* fix: serialize filters properly

* test: add filter log tests

* fix(stream): always re-wake
This commit is contained in:
Georgios Konstantopoulos 2020-08-31 23:40:49 +03:00 committed by GitHub
parent cff6eb45a5
commit 6197d8bb12
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 118 additions and 20 deletions

View File

@ -3,7 +3,7 @@ use crate::{
types::{Address, BlockNumber, Bytes, H256, U256, U64},
utils::keccak256,
};
use serde::{Deserialize, Serialize, Serializer};
use serde::{ser::SerializeStruct, Deserialize, Serialize, Serializer};
use std::str::FromStr;
/// A log produced by a transaction.
@ -64,18 +64,15 @@ pub struct Log {
}
/// Filter for
#[derive(Default, Debug, PartialEq, Clone, Serialize)]
#[derive(Default, Debug, PartialEq, Clone)]
pub struct Filter {
/// From Block
#[serde(rename = "fromBlock", skip_serializing_if = "Option::is_none")]
pub from_block: Option<BlockNumber>,
/// To Block
#[serde(rename = "toBlock", skip_serializing_if = "Option::is_none")]
pub to_block: Option<BlockNumber>,
/// Address
#[serde(skip_serializing_if = "Option::is_none")]
// TODO: The spec says that this can also be an array, do we really want to
// monitor for the same event for multiple contracts?
address: Option<Address>,
@ -86,10 +83,48 @@ pub struct Filter {
pub topics: [Option<ValueOrArray<H256>>; 4],
/// Limit
#[serde(skip_serializing_if = "Option::is_none")]
limit: Option<usize>,
}
impl Serialize for Filter {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut s = serializer.serialize_struct("Filter", 5)?;
if let Some(ref from_block) = self.from_block {
s.serialize_field("fromBlock", from_block)?;
}
if let Some(ref to_block) = self.to_block {
s.serialize_field("toBlock", to_block)?;
}
if let Some(ref address) = self.address {
s.serialize_field("address", address)?;
}
let mut filtered_topics = Vec::new();
for i in 0..4 {
if self.topics[i].is_some() {
filtered_topics.push(&self.topics[i]);
} else {
// TODO: This can be optimized
if self.topics[i + 1..].iter().any(|x| x.is_some()) {
filtered_topics.push(&None);
}
}
}
s.serialize_field("topics", &filtered_topics)?;
if let Some(ref limit) = self.limit {
s.serialize_field("limit", limit)?;
}
s.end()
}
}
impl Filter {
pub fn new() -> Self {
Self::default()
@ -205,21 +240,83 @@ where
mod tests {
use super::*;
use crate::utils::serialize;
use serde_json::json;
#[test]
fn filter_serialization_test() {
let t1 = "9729a6fbefefc8f6005933898b13dc45c3a2c8b7"
.parse::<Address>()
.unwrap();
let t2 = H256::from([0; 32]);
let t3 = U256::from(123);
let filter = Filter::new()
.address_str("f817796F60D268A36a57b8D2dF1B97B14C0D0E1d")
.unwrap()
.event("ValueChanged(address,string,string)") // event name
.topic1(t1)
.topic2(t3);
let ser = serialize(&filter).to_string();
assert_eq!(ser, "{\"address\":\"0xf817796f60d268a36a57b8d2df1b97b14c0d0e1d\",\"topics\":[\"0xe826f71647b8486f2bae59832124c70792fba044036720a54ec8dacdd5df4fcb\",\"0x0000000000000000000000009729a6fbefefc8f6005933898b13dc45c3a2c8b7\",\"0x000000000000000000000000000000000000000000000000000000000000007b\",null]}");
let t1_padded = H256::from(t1);
let t3_padded = H256::from({
let mut x = [0; 32];
x[31] = 123;
x
});
let event = "ValueChanged(address,string,string)";
let t0 = H256::from(keccak256(event.as_bytes()));
let addr = Address::from_str("f817796F60D268A36a57b8D2dF1B97B14C0D0E1d").unwrap();
let filter = Filter::new();
let ser = serialize(&filter.clone());
assert_eq!(ser, json!({ "topics": [] }));
let filter = filter.address(addr);
let ser = serialize(&filter.clone());
assert_eq!(ser, json!({"address" : addr, "topics": []}));
let filter = filter.event(event);
// 0
let ser = serialize(&filter.clone());
assert_eq!(ser, json!({ "address" : addr, "topics": [t0]}));
// 1
let ser = serialize(&filter.clone().topic1(t1));
assert_eq!(ser, json!({ "address" : addr, "topics": [t0, t1_padded]}));
// 2
let ser = serialize(&filter.clone().topic2(t2));
assert_eq!(ser, json!({ "address" : addr, "topics": [t0, null, t2]}));
// 3
let ser = serialize(&filter.clone().topic3(t3));
assert_eq!(
ser,
json!({ "address" : addr, "topics": [t0, null, null, t3_padded]})
);
// 1 & 2
let ser = serialize(&filter.clone().topic1(t1).topic2(t2));
assert_eq!(
ser,
json!({ "address" : addr, "topics": [t0, t1_padded, t2]})
);
// 1 & 3
let ser = serialize(&filter.clone().topic1(t1).topic3(t3));
assert_eq!(
ser,
json!({ "address" : addr, "topics": [t0, t1_padded, null, t3_padded]})
);
// 2 & 3
let ser = serialize(&filter.clone().topic2(t2).topic3(t3));
assert_eq!(
ser,
json!({ "address" : addr, "topics": [t0, null, t2, t3_padded]})
);
// 1 & 2 & 3
let ser = serialize(&filter.clone().topic1(t1).topic2(t2).topic3(t3));
assert_eq!(
ser,
json!({ "address" : addr, "topics": [t0, t1_padded, t2, t3_padded]})
);
}
}

View File

@ -125,19 +125,20 @@ where
// in a streamed loop we wouldn't want the loop to terminate if an error
// is encountered (since it might be a temporary error).
let items: Vec<R> = futures_util::ready!(fut.poll_unpin(cx)).unwrap_or_default();
cx.waker().wake_by_ref();
FilterWatcherState::NextItem(items.into_iter())
}
// Consume 1 element from the vector. If more elements are in the vector,
// the next call will immediately go to this branch instead of trying to get
// filter changes again. Once the whole vector is consumed, it will poll again
// for new logs
FilterWatcherState::NextItem(iter) => match iter.next() {
Some(item) => return Poll::Ready(Some(item)),
None => {
cx.waker().wake_by_ref();
FilterWatcherState::WaitForInterval
FilterWatcherState::NextItem(iter) => {
cx.waker().wake_by_ref();
match iter.next() {
Some(item) => return Poll::Ready(Some(item)),
None => FilterWatcherState::WaitForInterval,
}
},
}
};
Poll::Pending