trying with Arc<Mutex

This commit is contained in:
geemo 2023-03-01 21:11:16 -06:00
parent 72f74bd05a
commit a740cb656f
3 changed files with 27 additions and 15 deletions

View File

@ -213,7 +213,7 @@ impl ClientBuilder {
} }
pub struct Client<DB: Database, N: ConsensusNetworkInterface> { pub struct Client<DB: Database, N: ConsensusNetworkInterface> {
node: Arc<RwLock<Node<N>>>, node: Node<N>,
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
rpc: Option<Rpc<N>>, rpc: Option<Rpc<N>>,
db: DB, db: DB,

View File

@ -23,13 +23,13 @@ use common::{
use execution::types::{CallOpts, ExecutionBlock}; use execution::types::{CallOpts, ExecutionBlock};
pub struct Rpc<N: ConsensusNetworkInterface> { pub struct Rpc<N: ConsensusNetworkInterface> {
node: Arc<RwLock<Node<N>>>, node: Node<N>,
handle: Option<HttpServerHandle>, handle: Option<HttpServerHandle>,
port: u16, port: u16,
} }
impl<N: ConsensusNetworkInterface> Rpc<N> { impl<N: ConsensusNetworkInterface> Rpc<N> {
pub fn new(node: Arc<RwLock<Node<N>>>, port: u16) -> Self { pub fn new(node: Node<N>, port: u16) -> Self {
Rpc { Rpc {
node, node,
handle: None, handle: None,
@ -126,7 +126,7 @@ trait NetRpc {
} }
struct RpcInner<N: ConsensusNetworkInterface> { struct RpcInner<N: ConsensusNetworkInterface> {
node: Arc<RwLock<Node<N>>>, node: Node<N>,
port: u16, port: u16,
} }

View File

@ -21,6 +21,7 @@ use std::time::Instant;
use std::collections::HashMap; use std::collections::HashMap;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use log::{debug, error}; use log::{debug, error};
use std::sync::{Arc, Mutex};
use super::config::Config as ConsensusConfig; use super::config::Config as ConsensusConfig;
mod enr; mod enr;
@ -67,9 +68,9 @@ impl From<Discv5Error> for DiscoveryError {
pub struct Discovery { pub struct Discovery {
discv5: Discv5, discv5: Discv5,
local_enr: Enr, local_enr: Enr,
event_stream: EventStream, event_stream: Arc<Mutex<EventStream>>,
multiaddr_map: HashMap<PeerId, Multiaddr>, multiaddr_map: HashMap<PeerId, Multiaddr>,
active_queries: FuturesUnordered<std::pin::Pin<Box<dyn Future<Output = DiscResult> + Send>>>, active_queries: Arc<Mutex<FuturesUnordered<std::pin::Pin<Box<dyn Future<Output = DiscResult> + Send>>>>>,
pub started: bool, pub started: bool,
} }
@ -105,9 +106,9 @@ impl Discovery {
Ok(Self { Ok(Self {
discv5, discv5,
local_enr, local_enr,
event_stream, event_stream: Arc::new(Mutex::new(event_stream)),
multiaddr_map: HashMap::new(), multiaddr_map: HashMap::new(),
active_queries: FuturesUnordered::new(), active_queries: Arc::new(Mutex::new(FuturesUnordered::new())),
started: !config.disable_discovery, started: !config.disable_discovery,
}) })
} }
@ -123,13 +124,17 @@ impl Discovery {
let peers_enr = self.discv5.find_node_predicate(target, predicate, 16); let peers_enr = self.discv5.find_node_predicate(target, predicate, 16);
self.active_queries.push(Box::pin(peers_enr)); // TODO: Consider changing error handling to not use unwrap
self.active_queries.lock().unwrap().push(Box::pin(peers_enr));
} }
fn get_peers(&mut self, cx: &mut Context) -> Option<DiscoveredPeers> { fn get_peers(&mut self, cx: &mut Context) -> Option<DiscoveredPeers> {
while let Poll::Ready(Some(res)) = self.active_queries.poll_next_unpin(cx) { // TODO: Consider changing error handling to not use unwrap
while let Poll::Ready(Some(res)) = self.active_queries.lock().unwrap().poll_next_unpin(cx) {
if res.is_ok() { if res.is_ok() {
self.active_queries = FuturesUnordered::new(); if let Ok(mut active_queries) = self.active_queries.lock() {
active_queries.clear();
}
let mut peers: HashMap<PeerId, Option<Instant>> = HashMap::new(); let mut peers: HashMap<PeerId, Option<Instant>> = HashMap::new();
@ -178,6 +183,7 @@ impl NetworkBehaviour for Discovery {
peer_addresses peer_addresses
} }
// Main execution loop to drive the behaviour
fn poll( fn poll(
&mut self, &mut self,
cx: &mut Context, cx: &mut Context,
@ -192,20 +198,26 @@ impl NetworkBehaviour for Discovery {
if let Some(dp) = self.get_peers(cx) { if let Some(dp) = self.get_peers(cx) {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(dp)); return Poll::Ready(NetworkBehaviourAction::GenerateEvent(dp));
} };
// Process the discovery server event stream // Process the discovery server event stream
match self.event_stream { // TODO: Fix error hadling by removing unwrap
match *self.event_stream.lock().unwrap() {
EventStream::Awaiting(ref mut fut) => { EventStream::Awaiting(ref mut fut) => {
// Still awaiting the event stream, poll it // Still awaiting the event stream, poll it
if let Poll::Ready(event_stream) = fut.poll_unpin(cx) { if let Poll::Ready(event_stream) = fut.poll_unpin(cx) {
match event_stream { match event_stream {
Ok(stream) => { Ok(stream) => {
println!("Discv5 event stream ready"); println!("Discv5 event stream ready");
self.event_stream = EventStream::Present(stream); if let Ok(mut event_stream) = self.event_stream.lock() {
*event_stream = EventStream::Present(stream);
}
} }
Err(_) => { Err(_) => {
println!("Discv5 event stream failed"); println!("Discv5 event stream failed");
self.event_stream = EventStream::InActive; if let Ok(mut event_stream) = self.event_stream.lock() {
*event_stream = EventStream::InActive;
}
} }
} }
} }