From a51e3430e1a7f164814e1f38b4ead893ebfd28e3 Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Tue, 30 Jan 2024 00:31:31 -0500 Subject: [PATCH] refactor: more refactoring to break import cycles, introduce a mediator between protocol and service --- common/mediator.go | 1 + protocol/handshake_open.go | 8 +-- protocol/hash_query.go | 20 +++---- protocol/mediator.go | 29 ++++++++++ protocol/protocol.go | 3 +- protocol/registry_entry.go | 4 +- protocol/registry_query.go | 4 +- protocol/signed_announce_peers.go | 4 +- protocol/signed_handshake_done.go | 9 ++-- protocol/storage_location.go | 13 +++-- service/default/mediator.go | 88 +++++++++++++++++++++++++++++++ service/default/p2p.go | 5 ++ 12 files changed, 153 insertions(+), 35 deletions(-) create mode 100644 common/mediator.go create mode 100644 protocol/mediator.go create mode 100644 service/default/mediator.go diff --git a/common/mediator.go b/common/mediator.go new file mode 100644 index 0000000..805d0c7 --- /dev/null +++ b/common/mediator.go @@ -0,0 +1 @@ +package common diff --git a/protocol/handshake_open.go b/protocol/handshake_open.go index 7a582f0..2fa442b 100644 --- a/protocol/handshake_open.go +++ b/protocol/handshake_open.go @@ -91,20 +91,20 @@ func (h *HandshakeOpen) DecodeMessage(dec *msgpack.Decoder, message IncomingMess func (h *HandshakeOpen) HandleMessage(message IncomingMessageData) error { peer := message.Peer - services := message.Services + mediator := message.Mediator - if h.networkId != services.P2P().NetworkId() { + if h.networkId != mediator.NetworkId() { return fmt.Errorf("Peer is in different network: %s", h.networkId) } - handshake := NewHandshakeDoneRequest(h.handshake, types.SupportedFeatures, services.P2P().SelfConnectionUris()) + handshake := NewHandshakeDoneRequest(h.handshake, types.SupportedFeatures, mediator.SelfConnectionUris()) hsMessage, err := msgpack.Marshal(handshake) if err != nil { return err } - secureMessage, err := services.P2P().SignMessageSimple(hsMessage) + secureMessage, err := mediator.SignMessageSimple(hsMessage) if err != nil { return err diff --git a/protocol/hash_query.go b/protocol/hash_query.go index e6e0435..e1aca99 100644 --- a/protocol/hash_query.go +++ b/protocol/hash_query.go @@ -89,11 +89,11 @@ func (h HashQuery) EncodeMsgpack(enc *msgpack.Encoder) error { func (h *HashQuery) HandleMessage(message IncomingMessageData) error { peer := message.Peer - services := message.Services + mediator := message.Mediator logger := message.Logger config := message.Config - mapLocations, err := services.Storage().GetCachedStorageLocations(h.hash, h.kinds) + mapLocations, err := mediator.GetCachedStorageLocations(h.hash, h.kinds) if err != nil { log.Printf("Error getting cached storage locations: %v", err) return err @@ -111,7 +111,7 @@ func (h *HashQuery) HandleMessage(message IncomingMessageData) error { availableNodes = append(availableNodes, nodeId) } - score, err := services.P2P().SortNodesByScore(availableNodes) + score, err := mediator.SortNodesByScore(availableNodes) if err != nil { return err } @@ -130,16 +130,16 @@ func (h *HashQuery) HandleMessage(message IncomingMessageData) error { } } - if services.Storage().ProviderStore() != nil { - if services.Storage().ProviderStore().CanProvide(h.hash, h.kinds) { - location, err := services.Storage().ProviderStore().Provide(h.hash, h.kinds) + if mediator.ProviderStore() != nil { + if mediator.ProviderStore().CanProvide(h.hash, h.kinds) { + location, err := mediator.ProviderStore().Provide(h.hash, h.kinds) if err != nil { return err } message := storage.PrepareProvideMessage(config.KeyPair, h.hash, location) - err = services.Storage().AddStorageLocation(h.hash, services.P2P().NodeId(), location, message) + err = mediator.AddStorageLocation(h.hash, mediator.NodeId(), location, message) if err != nil { return err } @@ -157,7 +157,7 @@ func (h *HashQuery) HandleMessage(message IncomingMessageData) error { if err != nil { return err } - peersVal, ok := services.P2P().HashQueryRoutingTable().Get(hashString) // Implement HashQueryRoutingTable method + peersVal, ok := mediator.HashQueryRoutingTable().Get(hashString) if ok { peers = peersVal.(*hashset.Set) if !peers.Contains(peer.Id()) { @@ -170,9 +170,9 @@ func (h *HashQuery) HandleMessage(message IncomingMessageData) error { peerList := hashset.New() peerList.Add(peer.Id()) - services.P2P().HashQueryRoutingTable().Put(hashString, peerList) + mediator.HashQueryRoutingTable().Put(hashString, peerList) - for _, val := range services.P2P().Peers().Values() { + for _, val := range mediator.Peers().Values() { peerVal := val.(net.Peer) if !peerVal.Id().Equals(peer.Id()) { err := peerVal.SendMessage(message.Original) diff --git a/protocol/mediator.go b/protocol/mediator.go new file mode 100644 index 0000000..277ea21 --- /dev/null +++ b/protocol/mediator.go @@ -0,0 +1,29 @@ +package protocol + +import ( + "git.lumeweb.com/LumeWeb/libs5-go/encoding" + "git.lumeweb.com/LumeWeb/libs5-go/net" + "git.lumeweb.com/LumeWeb/libs5-go/storage" + "git.lumeweb.com/LumeWeb/libs5-go/structs" + "git.lumeweb.com/LumeWeb/libs5-go/types" + "net/url" +) + +type Mediator interface { + NetworkId() string + NodeId() *encoding.NodeId + SelfConnectionUris() []*url.URL + SignMessageSimple(message []byte) ([]byte, error) + GetCachedStorageLocations(hash *encoding.Multihash, kinds []types.StorageLocationType) (map[string]storage.StorageLocation, error) + SortNodesByScore(nodes []*encoding.NodeId) ([]*encoding.NodeId, error) + ProviderStore() storage.ProviderStore + AddStorageLocation(hash *encoding.Multihash, nodeId *encoding.NodeId, location storage.StorageLocation, message []byte) error + HashQueryRoutingTable() structs.Map + Peers() structs.Map + RegistrySet(sre SignedRegistryEntry, trusted bool, receivedFrom net.Peer) error + RegistryGet(pk []byte) (SignedRegistryEntry, error) + ConnectToNode(connectionUris []*url.URL, retried bool, fromPeer net.Peer) error + ServicesStarted() bool + AddPeer(peer net.Peer) error + SendPublicPeersToPeer(peer net.Peer, peersToSend []net.Peer) error +} diff --git a/protocol/protocol.go b/protocol/protocol.go index cb5562e..1491f6e 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -5,7 +5,6 @@ import ( "crypto/rand" "git.lumeweb.com/LumeWeb/libs5-go/config" "git.lumeweb.com/LumeWeb/libs5-go/net" - "git.lumeweb.com/LumeWeb/libs5-go/service" "github.com/vmihailenco/msgpack/v5" "go.uber.org/zap" "io" @@ -51,11 +50,11 @@ type IncomingMessageData struct { Original []byte Data []byte Ctx context.Context - Services service.Services Logger *zap.Logger Peer net.Peer Config *config.NodeConfig VerifyId bool + Mediator Mediator } type IncomingMessageReader struct { diff --git a/protocol/registry_entry.go b/protocol/registry_entry.go index 2ba52ef..ce9c4de 100644 --- a/protocol/registry_entry.go +++ b/protocol/registry_entry.go @@ -50,7 +50,5 @@ func (s *RegistryEntryRequest) DecodeMessage(dec *msgpack.Decoder, message Incom } func (s *RegistryEntryRequest) HandleMessage(message IncomingMessageData) error { - peer := message.Peer - services := message.Services - return services.Registry().Set(s.sre, false, peer) + return message.Mediator.RegistrySet(s.sre, false, message.Peer) } diff --git a/protocol/registry_query.go b/protocol/registry_query.go index b6e3d9e..0010155 100644 --- a/protocol/registry_query.go +++ b/protocol/registry_query.go @@ -50,9 +50,9 @@ func (s *RegistryQuery) DecodeMessage(dec *msgpack.Decoder, message IncomingMess } func (s *RegistryQuery) HandleMessage(message IncomingMessageData) error { - services := message.Services + mediator := message.Mediator peer := message.Peer - sre, err := services.Registry().Get(s.pk) + sre, err := mediator.RegistryGet(s.pk) if err != nil { return err } diff --git a/protocol/signed_announce_peers.go b/protocol/signed_announce_peers.go index 35151b8..d3e82cd 100644 --- a/protocol/signed_announce_peers.go +++ b/protocol/signed_announce_peers.go @@ -105,10 +105,10 @@ func (a *AnnouncePeers) DecodeMessage(dec *msgpack.Decoder, message IncomingMess } func (a AnnouncePeers) HandleMessage(message IncomingMessageDataSigned) error { - services := message.Services + mediator := message.Mediator peer := message.Peer if len(a.connectionUris) > 0 { - err := services.P2P().ConnectToNode([]*url.URL{a.connectionUris[0]}, false, peer) + err := mediator.ConnectToNode([]*url.URL{a.connectionUris[0]}, false, peer) if err != nil { return err } diff --git a/protocol/signed_handshake_done.go b/protocol/signed_handshake_done.go index d20023e..329a4b5 100644 --- a/protocol/signed_handshake_done.go +++ b/protocol/signed_handshake_done.go @@ -4,7 +4,6 @@ import ( "bytes" "errors" "fmt" - "git.lumeweb.com/LumeWeb/libs5-go/net" "git.lumeweb.com/LumeWeb/libs5-go/types" "git.lumeweb.com/LumeWeb/libs5-go/utils" "github.com/vmihailenco/msgpack/v5" @@ -76,13 +75,13 @@ func NewHandshakeDone() *HandshakeDone { } func (h HandshakeDone) HandleMessage(message IncomingMessageDataSigned) error { - services := message.Services + mediator := message.Mediator peer := message.Peer verifyId := message.VerifyId nodeId := message.NodeId logger := message.Logger - if !services.IsStarted() { + if !mediator.ServicesStarted() { err := peer.End() if err != nil { return nil @@ -108,7 +107,7 @@ func (h HandshakeDone) HandleMessage(message IncomingMessageDataSigned) error { if h.supportedFeatures != types.SupportedFeatures { return fmt.Errorf("Remote node does not support required features") } - err := services.P2P().AddPeer(peer) + err := mediator.AddPeer(peer) if err != nil { return err } @@ -123,7 +122,7 @@ func (h HandshakeDone) HandleMessage(message IncomingMessageDataSigned) error { logger.Info(fmt.Sprintf("[+] %s (%s)", peerId, peer.RenderLocationURI())) - err = services.P2P().SendPublicPeersToPeer(peer, []net.Peer{peer}) + err = mediator.ConnectToNode([]*url.URL{h.connectionUris[0]}, false, peer) if err != nil { return err } diff --git a/protocol/storage_location.go b/protocol/storage_location.go index da17f1f..2baabc5 100644 --- a/protocol/storage_location.go +++ b/protocol/storage_location.go @@ -39,7 +39,7 @@ func (s *StorageLocation) DecodeMessage(dec *msgpack.Decoder, message IncomingMe } func (s *StorageLocation) HandleMessage(message IncomingMessageData) error { msg := message.Original - services := message.Services + mediator := message.Mediator peer := message.Peer logger := message.Logger @@ -79,9 +79,7 @@ func (s *StorageLocation) HandleMessage(message IncomingMessageData) error { nodeId := encoding.NewNodeId(publicKey) - // Assuming `node` is an instance of your NodeImpl structure - err := services.Storage().AddStorageLocation(hash, nodeId, storage.NewStorageLocation(int(typeOfData), parts, int64(expiry)), msg) // Implement AddStorageLocation - + err := mediator.AddStorageLocation(hash, nodeId, storage.NewStorageLocation(int(typeOfData), parts, int64(expiry)), msg) if err != nil { return fmt.Errorf("Failed to add storage location: %s", err) } @@ -92,7 +90,7 @@ func (s *StorageLocation) HandleMessage(message IncomingMessageData) error { } var list *hashset.Set - listVal, ok := services.P2P().HashQueryRoutingTable().Get(hashStr) // Implement HashQueryRoutingTable method + listVal, ok := mediator.HashQueryRoutingTable().Get(hashStr) // Implement HashQueryRoutingTable method if !ok { list = hashset.New() } else { @@ -109,7 +107,8 @@ func (s *StorageLocation) HandleMessage(message IncomingMessageData) error { if err != nil { return err } - if peerVal, ok := services.P2P().Peers().Get(peerIdStr); ok { + + if peerVal, ok := mediator.Peers().Get(peerIdStr); ok { foundPeer := peerVal.(net.Peer) err := foundPeer.SendMessage(msg) if err != nil { @@ -118,7 +117,7 @@ func (s *StorageLocation) HandleMessage(message IncomingMessageData) error { } } - services.P2P().HashQueryRoutingTable().Remove(hash.HashCode()) + mediator.HashQueryRoutingTable().Remove(hashStr) } return nil diff --git a/service/default/mediator.go b/service/default/mediator.go new file mode 100644 index 0000000..92a9535 --- /dev/null +++ b/service/default/mediator.go @@ -0,0 +1,88 @@ +package _default + +import ( + "git.lumeweb.com/LumeWeb/libs5-go/encoding" + "git.lumeweb.com/LumeWeb/libs5-go/net" + "git.lumeweb.com/LumeWeb/libs5-go/protocol" + "git.lumeweb.com/LumeWeb/libs5-go/service" + "git.lumeweb.com/LumeWeb/libs5-go/storage" + "git.lumeweb.com/LumeWeb/libs5-go/structs" + "git.lumeweb.com/LumeWeb/libs5-go/types" + "net/url" +) + +var _ protocol.Mediator = (*MediatorDefault)(nil) + +type MediatorDefault struct { + service.ServiceBase +} + +func (m MediatorDefault) NetworkId() string { + return m.Services().P2P().NetworkId() +} + +func (m MediatorDefault) NodeId() *encoding.NodeId { + return m.Services().P2P().NodeId() +} + +func (m MediatorDefault) SelfConnectionUris() []*url.URL { + return m.Services().P2P().SelfConnectionUris() +} + +func (m MediatorDefault) SignMessageSimple(message []byte) ([]byte, error) { + return m.Services().P2P().SignMessageSimple(message) +} + +func (m MediatorDefault) GetCachedStorageLocations(hash *encoding.Multihash, kinds []types.StorageLocationType) (map[string]storage.StorageLocation, error) { + return m.Services().Storage().GetCachedStorageLocations(hash, kinds) +} + +func (m MediatorDefault) SortNodesByScore(nodes []*encoding.NodeId) ([]*encoding.NodeId, error) { + return m.Services().P2P().SortNodesByScore(nodes) +} + +func (m MediatorDefault) ProviderStore() storage.ProviderStore { + return m.Services().Storage().ProviderStore() +} + +func (m MediatorDefault) AddStorageLocation(hash *encoding.Multihash, nodeId *encoding.NodeId, location storage.StorageLocation, message []byte) error { + return m.Services().Storage().AddStorageLocation(hash, nodeId, location, message) +} + +func (m MediatorDefault) HashQueryRoutingTable() structs.Map { + return m.Services().P2P().HashQueryRoutingTable() +} + +func (m MediatorDefault) Peers() structs.Map { + return m.Services().P2P().Peers() +} + +func (m MediatorDefault) RegistrySet(sre protocol.SignedRegistryEntry, trusted bool, receivedFrom net.Peer) error { + return m.Services().Registry().Set(sre, trusted, receivedFrom) +} + +func (m MediatorDefault) RegistryGet(pk []byte) (protocol.SignedRegistryEntry, error) { + return m.Services().Registry().Get(pk) +} + +func (m MediatorDefault) ConnectToNode(connectionUris []*url.URL, retried bool, fromPeer net.Peer) error { + return m.Services().P2P().ConnectToNode(connectionUris, retried, fromPeer) +} + +func (m MediatorDefault) ServicesStarted() bool { + return m.Services().IsStarted() +} + +func (m MediatorDefault) AddPeer(peer net.Peer) error { + return m.Services().P2P().AddPeer(peer) +} + +func (m MediatorDefault) SendPublicPeersToPeer(peer net.Peer, peersToSend []net.Peer) error { + return m.Services().P2P().SendPublicPeersToPeer(peer, peersToSend) +} + +func NewMediator(params service.ServiceParams) *MediatorDefault { + return &MediatorDefault{ + ServiceBase: service.NewServiceBase(params.Logger, params.Config, params.Db), + } +} diff --git a/service/default/p2p.go b/service/default/p2p.go index 1c00b80..e56c6ad 100644 --- a/service/default/p2p.go +++ b/service/default/p2p.go @@ -439,6 +439,11 @@ func (p *P2PServiceDefault) OnNewPeerListen(peer net.Peer, verifyId bool) { Peer: peer, VerifyId: verifyId, Config: p.Config(), + Mediator: NewMediator(service.ServiceParams{ + Logger: p.Logger(), + Config: p.Config(), + Db: p.Db(), + }), } dec := msgpack.NewDecoder(bytes.NewReader(reader.Data))