fix: add a peerPending map to track and ensure we don't try to connect to a peer again until removed, even if we haven't gotten a handshake done

This commit is contained in:
Derrick Hammer 2024-01-09 10:42:21 -05:00
parent 13be047bf8
commit 1678b40d82
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
1 changed files with 32 additions and 6 deletions

View File

@ -43,6 +43,7 @@ type P2PImpl struct {
inited bool
reconnectDelay structs.Map
peers structs.Map
peersPending structs.Map
}
func NewP2P(node interfaces.Node) *P2PImpl {
@ -54,6 +55,7 @@ func NewP2P(node interfaces.Node) *P2PImpl {
inited: false,
reconnectDelay: structs.NewMap(),
peers: structs.NewMap(),
peersPending: structs.NewMap(),
}
return service
@ -160,6 +162,11 @@ func (p *P2PImpl) ConnectToNode(connectionUris []*url.URL, retried bool) error {
return err
}
if p.peersPending.Contains(idString) || p.peers.Contains(idString) {
p.logger.Debug("already connected", zap.String("node", connectionUri.String()))
return nil
}
reconnectDelay := p.reconnectDelay.GetInt(idString)
if reconnectDelay == nil {
delay := 1
@ -208,6 +215,12 @@ func (p *P2PImpl) ConnectToNode(connectionUris []*url.URL, retried bool) error {
p.Node().ConnectionTracker().Add(1)
peerId, err := peer.Id().ToString()
if err != nil {
return err
}
p.peersPending.Put(peerId, peer)
go func() {
err := p.OnNewPeer(peer, true)
if err != nil {
@ -244,22 +257,27 @@ func (p *P2PImpl) OnNewPeer(peer net.Peer, verifyId bool) error {
if err != nil {
return err
}
p.logger.Debug("OnNewPeer sent handshake", zap.String("peer", pid))
p.logger.Debug("OnNewPeer before Wait", zap.String("peer", pid))
wg.Wait() // Wait for OnNewPeerListen goroutine to finish
p.logger.Debug("OnNewPeer ended", zap.String("peer", pid))
return nil
}
func (p *P2PImpl) OnNewPeerListen(peer net.Peer, verifyId bool) {
peerId, err := peer.Id().ToString()
if err != nil {
p.logger.Error("failed to get peer id", zap.Error(err))
return
}
onDone := net.CloseCallback(func() {
peerId, err := peer.Id().ToString()
if err != nil {
p.logger.Error("failed to get peer id", zap.Error(err))
return
}
// Handle closure of the connection
if p.peers.Contains(peerId) {
p.peers.Remove(peerId)
}
if p.peersPending.Contains(peerId) {
p.peersPending.Remove(peerId)
}
})
onError := net.ErrorCallback(func(args ...interface{}) {
@ -270,6 +288,7 @@ func (p *P2PImpl) OnNewPeerListen(peer net.Peer, verifyId bool) {
imsg := base.NewIncomingMessageUnknown()
err := msgpack.Unmarshal(message, imsg)
p.logger.Debug("ListenForMessages", zap.Any("message", imsg), zap.String("peer", peerId))
if err != nil {
return err
}
@ -395,6 +414,10 @@ func (p *P2PImpl) AddPeer(peer net.Peer) error {
p.peers.Put(peerId, peer)
p.reconnectDelay.Put(peerId, 1)
if p.peersPending.Contains(peerId) {
p.peersPending.Remove(peerId)
}
return nil
}
func (p *P2PImpl) SendPublicPeersToPeer(peer net.Peer, peersToSend []net.Peer) error {
@ -467,3 +490,6 @@ func (p *P2PImpl) vote(nodeId *encoding.NodeId, upvote bool) error {
return nil
}
func (p *P2PImpl) NodeId() *encoding.NodeId {
return p.localNodeID
}