diff --git a/interfaces/node.go b/interfaces/node.go index da7d985..0112faa 100644 --- a/interfaces/node.go +++ b/interfaces/node.go @@ -8,6 +8,7 @@ import ( "git.lumeweb.com/LumeWeb/libs5-go/types" bolt "go.etcd.io/bbolt" "go.uber.org/zap" + "sync" ) //go:generate mockgen -source=node.go -destination=../mocks/interfaces/node.go -package=interfaces @@ -25,4 +26,6 @@ type Node interface { NetworkId() string DownloadBytesByHash(hash *encoding.Multihash) ([]byte, error) GetMetadataByCID(cid *encoding.CID) (metadata.Metadata, error) + WaitOnConnectedPeers() + ConnectionTracker() *sync.WaitGroup } diff --git a/node/node.go b/node/node.go index 6350f3e..a0d96d8 100644 --- a/node/node.go +++ b/node/node.go @@ -16,6 +16,7 @@ import ( bolt "go.etcd.io/bbolt" "go.uber.org/zap" "log" + "sync" "time" ) @@ -31,6 +32,7 @@ type NodeImpl struct { services interfaces.Services cacheBucket *bolt.Bucket httpClient *resty.Client + connections sync.WaitGroup } func (n *NodeImpl) NetworkId() string { @@ -284,3 +286,10 @@ func (n *NodeImpl) GetMetadataByCID(cid *encoding.CID) (metadata.Metadata, error return md, nil } +func (n *NodeImpl) WaitOnConnectedPeers() { + n.connections.Wait() +} + +func (n *NodeImpl) ConnectionTracker() *sync.WaitGroup { + return &n.connections +} diff --git a/service/p2p.go b/service/p2p.go index 0be9e83..a5eb153 100644 --- a/service/p2p.go +++ b/service/p2p.go @@ -206,12 +206,16 @@ func (p *P2PImpl) ConnectToNode(connectionUris []*url.URL, retried bool) error { peer.SetId(id) + p.Node().ConnectionTracker().Add(1) + go func() { err := p.OnNewPeer(peer, true) if err != nil { - p.logger.Error("failed to add peer", zap.Error(err)) + p.logger.Error("peer error", zap.Error(err)) } + p.Node().ConnectionTracker().Done() }() + return nil } @@ -219,6 +223,9 @@ func (p *P2PImpl) ConnectToNode(connectionUris []*url.URL, retried bool) error { func (p *P2PImpl) OnNewPeer(peer net.Peer, verifyId bool) error { var wg sync.WaitGroup + pid, _ := peer.Id().ToString() + p.logger.Debug("OnNewPeer started", zap.String("peer", pid)) + challenge := protocol.GenerateChallenge() peer.SetChallenge(challenge)