refactor: need to add in node level wait group to optionally wait and keep the node running

This commit is contained in:
Derrick Hammer 2024-01-09 09:11:36 -05:00
parent ff1db75f14
commit 8281729888
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
3 changed files with 20 additions and 1 deletions

View File

@ -8,6 +8,7 @@ import (
"git.lumeweb.com/LumeWeb/libs5-go/types" "git.lumeweb.com/LumeWeb/libs5-go/types"
bolt "go.etcd.io/bbolt" bolt "go.etcd.io/bbolt"
"go.uber.org/zap" "go.uber.org/zap"
"sync"
) )
//go:generate mockgen -source=node.go -destination=../mocks/interfaces/node.go -package=interfaces //go:generate mockgen -source=node.go -destination=../mocks/interfaces/node.go -package=interfaces
@ -25,4 +26,6 @@ type Node interface {
NetworkId() string NetworkId() string
DownloadBytesByHash(hash *encoding.Multihash) ([]byte, error) DownloadBytesByHash(hash *encoding.Multihash) ([]byte, error)
GetMetadataByCID(cid *encoding.CID) (metadata.Metadata, error) GetMetadataByCID(cid *encoding.CID) (metadata.Metadata, error)
WaitOnConnectedPeers()
ConnectionTracker() *sync.WaitGroup
} }

View File

@ -16,6 +16,7 @@ import (
bolt "go.etcd.io/bbolt" bolt "go.etcd.io/bbolt"
"go.uber.org/zap" "go.uber.org/zap"
"log" "log"
"sync"
"time" "time"
) )
@ -31,6 +32,7 @@ type NodeImpl struct {
services interfaces.Services services interfaces.Services
cacheBucket *bolt.Bucket cacheBucket *bolt.Bucket
httpClient *resty.Client httpClient *resty.Client
connections sync.WaitGroup
} }
func (n *NodeImpl) NetworkId() string { func (n *NodeImpl) NetworkId() string {
@ -284,3 +286,10 @@ func (n *NodeImpl) GetMetadataByCID(cid *encoding.CID) (metadata.Metadata, error
return md, nil return md, nil
} }
func (n *NodeImpl) WaitOnConnectedPeers() {
n.connections.Wait()
}
func (n *NodeImpl) ConnectionTracker() *sync.WaitGroup {
return &n.connections
}

View File

@ -206,12 +206,16 @@ func (p *P2PImpl) ConnectToNode(connectionUris []*url.URL, retried bool) error {
peer.SetId(id) peer.SetId(id)
p.Node().ConnectionTracker().Add(1)
go func() { go func() {
err := p.OnNewPeer(peer, true) err := p.OnNewPeer(peer, true)
if err != nil { if err != nil {
p.logger.Error("failed to add peer", zap.Error(err)) p.logger.Error("peer error", zap.Error(err))
} }
p.Node().ConnectionTracker().Done()
}() }()
return nil return nil
} }
@ -219,6 +223,9 @@ func (p *P2PImpl) ConnectToNode(connectionUris []*url.URL, retried bool) error {
func (p *P2PImpl) OnNewPeer(peer net.Peer, verifyId bool) error { func (p *P2PImpl) OnNewPeer(peer net.Peer, verifyId bool) error {
var wg sync.WaitGroup var wg sync.WaitGroup
pid, _ := peer.Id().ToString()
p.logger.Debug("OnNewPeer started", zap.String("peer", pid))
challenge := protocol.GenerateChallenge() challenge := protocol.GenerateChallenge()
peer.SetChallenge(challenge) peer.SetChallenge(challenge)