diff --git a/client/src/client.rs b/client/src/client.rs index aafecdb..93ed219 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -213,7 +213,7 @@ impl ClientBuilder { } pub struct Client { - node: Arc>>, + node: Node, #[cfg(not(target_arch = "wasm32"))] rpc: Option>, db: DB, diff --git a/client/src/rpc.rs b/client/src/rpc.rs index 513d7aa..350dfca 100644 --- a/client/src/rpc.rs +++ b/client/src/rpc.rs @@ -23,13 +23,13 @@ use common::{ use execution::types::{CallOpts, ExecutionBlock}; pub struct Rpc { - node: Arc>>, + node: Node, handle: Option, port: u16, } impl Rpc { - pub fn new(node: Arc>>, port: u16) -> Self { + pub fn new(node: Node, port: u16) -> Self { Rpc { node, handle: None, @@ -126,7 +126,7 @@ trait NetRpc { } struct RpcInner { - node: Arc>>, + node: Node, port: u16, } diff --git a/consensus/src/p2p/discovery/mod.rs b/consensus/src/p2p/discovery/mod.rs index 1eaf105..732f7f2 100644 --- a/consensus/src/p2p/discovery/mod.rs +++ b/consensus/src/p2p/discovery/mod.rs @@ -21,6 +21,7 @@ use std::time::Instant; use std::collections::HashMap; use std::task::{Context, Poll}; use log::{debug, error}; +use std::sync::{Arc, Mutex}; use super::config::Config as ConsensusConfig; mod enr; @@ -67,9 +68,9 @@ impl From for DiscoveryError { pub struct Discovery { discv5: Discv5, local_enr: Enr, - event_stream: EventStream, + event_stream: Arc>, multiaddr_map: HashMap, - active_queries: FuturesUnordered + Send>>>, + active_queries: Arc + Send>>>>>, pub started: bool, } @@ -105,9 +106,9 @@ impl Discovery { Ok(Self { discv5, local_enr, - event_stream, + event_stream: Arc::new(Mutex::new(event_stream)), multiaddr_map: HashMap::new(), - active_queries: FuturesUnordered::new(), + active_queries: Arc::new(Mutex::new(FuturesUnordered::new())), started: !config.disable_discovery, }) } @@ -123,13 +124,17 @@ impl Discovery { let peers_enr = self.discv5.find_node_predicate(target, predicate, 16); - self.active_queries.push(Box::pin(peers_enr)); + // TODO: Consider changing error handling to not use unwrap + self.active_queries.lock().unwrap().push(Box::pin(peers_enr)); } fn get_peers(&mut self, cx: &mut Context) -> Option { - while let Poll::Ready(Some(res)) = self.active_queries.poll_next_unpin(cx) { + // TODO: Consider changing error handling to not use unwrap + while let Poll::Ready(Some(res)) = self.active_queries.lock().unwrap().poll_next_unpin(cx) { if res.is_ok() { - self.active_queries = FuturesUnordered::new(); + if let Ok(mut active_queries) = self.active_queries.lock() { + active_queries.clear(); + } let mut peers: HashMap> = HashMap::new(); @@ -178,6 +183,7 @@ impl NetworkBehaviour for Discovery { peer_addresses } + // Main execution loop to drive the behaviour fn poll( &mut self, cx: &mut Context, @@ -192,20 +198,26 @@ impl NetworkBehaviour for Discovery { if let Some(dp) = self.get_peers(cx) { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(dp)); - } + }; + // Process the discovery server event stream - match self.event_stream { + // TODO: Fix error hadling by removing unwrap + match *self.event_stream.lock().unwrap() { EventStream::Awaiting(ref mut fut) => { // Still awaiting the event stream, poll it if let Poll::Ready(event_stream) = fut.poll_unpin(cx) { match event_stream { Ok(stream) => { println!("Discv5 event stream ready"); - self.event_stream = EventStream::Present(stream); + if let Ok(mut event_stream) = self.event_stream.lock() { + *event_stream = EventStream::Present(stream); + } } Err(_) => { println!("Discv5 event stream failed"); - self.event_stream = EventStream::InActive; + if let Ok(mut event_stream) = self.event_stream.lock() { + *event_stream = EventStream::InActive; + } } } }