refactor: more refactoring to break import cycles, introduce a mediator between protocol and service
This commit is contained in:
parent
05ab4e7c0f
commit
a51e3430e1
|
@ -0,0 +1 @@
|
||||||
|
package common
|
|
@ -91,20 +91,20 @@ func (h *HandshakeOpen) DecodeMessage(dec *msgpack.Decoder, message IncomingMess
|
||||||
|
|
||||||
func (h *HandshakeOpen) HandleMessage(message IncomingMessageData) error {
|
func (h *HandshakeOpen) HandleMessage(message IncomingMessageData) error {
|
||||||
peer := message.Peer
|
peer := message.Peer
|
||||||
services := message.Services
|
mediator := message.Mediator
|
||||||
|
|
||||||
if h.networkId != services.P2P().NetworkId() {
|
if h.networkId != mediator.NetworkId() {
|
||||||
return fmt.Errorf("Peer is in different network: %s", h.networkId)
|
return fmt.Errorf("Peer is in different network: %s", h.networkId)
|
||||||
}
|
}
|
||||||
|
|
||||||
handshake := NewHandshakeDoneRequest(h.handshake, types.SupportedFeatures, services.P2P().SelfConnectionUris())
|
handshake := NewHandshakeDoneRequest(h.handshake, types.SupportedFeatures, mediator.SelfConnectionUris())
|
||||||
hsMessage, err := msgpack.Marshal(handshake)
|
hsMessage, err := msgpack.Marshal(handshake)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
secureMessage, err := services.P2P().SignMessageSimple(hsMessage)
|
secureMessage, err := mediator.SignMessageSimple(hsMessage)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -89,11 +89,11 @@ func (h HashQuery) EncodeMsgpack(enc *msgpack.Encoder) error {
|
||||||
|
|
||||||
func (h *HashQuery) HandleMessage(message IncomingMessageData) error {
|
func (h *HashQuery) HandleMessage(message IncomingMessageData) error {
|
||||||
peer := message.Peer
|
peer := message.Peer
|
||||||
services := message.Services
|
mediator := message.Mediator
|
||||||
logger := message.Logger
|
logger := message.Logger
|
||||||
config := message.Config
|
config := message.Config
|
||||||
|
|
||||||
mapLocations, err := services.Storage().GetCachedStorageLocations(h.hash, h.kinds)
|
mapLocations, err := mediator.GetCachedStorageLocations(h.hash, h.kinds)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Error getting cached storage locations: %v", err)
|
log.Printf("Error getting cached storage locations: %v", err)
|
||||||
return err
|
return err
|
||||||
|
@ -111,7 +111,7 @@ func (h *HashQuery) HandleMessage(message IncomingMessageData) error {
|
||||||
availableNodes = append(availableNodes, nodeId)
|
availableNodes = append(availableNodes, nodeId)
|
||||||
}
|
}
|
||||||
|
|
||||||
score, err := services.P2P().SortNodesByScore(availableNodes)
|
score, err := mediator.SortNodesByScore(availableNodes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -130,16 +130,16 @@ func (h *HashQuery) HandleMessage(message IncomingMessageData) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if services.Storage().ProviderStore() != nil {
|
if mediator.ProviderStore() != nil {
|
||||||
if services.Storage().ProviderStore().CanProvide(h.hash, h.kinds) {
|
if mediator.ProviderStore().CanProvide(h.hash, h.kinds) {
|
||||||
location, err := services.Storage().ProviderStore().Provide(h.hash, h.kinds)
|
location, err := mediator.ProviderStore().Provide(h.hash, h.kinds)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
message := storage.PrepareProvideMessage(config.KeyPair, h.hash, location)
|
message := storage.PrepareProvideMessage(config.KeyPair, h.hash, location)
|
||||||
|
|
||||||
err = services.Storage().AddStorageLocation(h.hash, services.P2P().NodeId(), location, message)
|
err = mediator.AddStorageLocation(h.hash, mediator.NodeId(), location, message)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -157,7 +157,7 @@ func (h *HashQuery) HandleMessage(message IncomingMessageData) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
peersVal, ok := services.P2P().HashQueryRoutingTable().Get(hashString) // Implement HashQueryRoutingTable method
|
peersVal, ok := mediator.HashQueryRoutingTable().Get(hashString)
|
||||||
if ok {
|
if ok {
|
||||||
peers = peersVal.(*hashset.Set)
|
peers = peersVal.(*hashset.Set)
|
||||||
if !peers.Contains(peer.Id()) {
|
if !peers.Contains(peer.Id()) {
|
||||||
|
@ -170,9 +170,9 @@ func (h *HashQuery) HandleMessage(message IncomingMessageData) error {
|
||||||
peerList := hashset.New()
|
peerList := hashset.New()
|
||||||
peerList.Add(peer.Id())
|
peerList.Add(peer.Id())
|
||||||
|
|
||||||
services.P2P().HashQueryRoutingTable().Put(hashString, peerList)
|
mediator.HashQueryRoutingTable().Put(hashString, peerList)
|
||||||
|
|
||||||
for _, val := range services.P2P().Peers().Values() {
|
for _, val := range mediator.Peers().Values() {
|
||||||
peerVal := val.(net.Peer)
|
peerVal := val.(net.Peer)
|
||||||
if !peerVal.Id().Equals(peer.Id()) {
|
if !peerVal.Id().Equals(peer.Id()) {
|
||||||
err := peerVal.SendMessage(message.Original)
|
err := peerVal.SendMessage(message.Original)
|
||||||
|
|
|
@ -0,0 +1,29 @@
|
||||||
|
package protocol
|
||||||
|
|
||||||
|
import (
|
||||||
|
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
|
||||||
|
"git.lumeweb.com/LumeWeb/libs5-go/net"
|
||||||
|
"git.lumeweb.com/LumeWeb/libs5-go/storage"
|
||||||
|
"git.lumeweb.com/LumeWeb/libs5-go/structs"
|
||||||
|
"git.lumeweb.com/LumeWeb/libs5-go/types"
|
||||||
|
"net/url"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Mediator interface {
|
||||||
|
NetworkId() string
|
||||||
|
NodeId() *encoding.NodeId
|
||||||
|
SelfConnectionUris() []*url.URL
|
||||||
|
SignMessageSimple(message []byte) ([]byte, error)
|
||||||
|
GetCachedStorageLocations(hash *encoding.Multihash, kinds []types.StorageLocationType) (map[string]storage.StorageLocation, error)
|
||||||
|
SortNodesByScore(nodes []*encoding.NodeId) ([]*encoding.NodeId, error)
|
||||||
|
ProviderStore() storage.ProviderStore
|
||||||
|
AddStorageLocation(hash *encoding.Multihash, nodeId *encoding.NodeId, location storage.StorageLocation, message []byte) error
|
||||||
|
HashQueryRoutingTable() structs.Map
|
||||||
|
Peers() structs.Map
|
||||||
|
RegistrySet(sre SignedRegistryEntry, trusted bool, receivedFrom net.Peer) error
|
||||||
|
RegistryGet(pk []byte) (SignedRegistryEntry, error)
|
||||||
|
ConnectToNode(connectionUris []*url.URL, retried bool, fromPeer net.Peer) error
|
||||||
|
ServicesStarted() bool
|
||||||
|
AddPeer(peer net.Peer) error
|
||||||
|
SendPublicPeersToPeer(peer net.Peer, peersToSend []net.Peer) error
|
||||||
|
}
|
|
@ -5,7 +5,6 @@ import (
|
||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
"git.lumeweb.com/LumeWeb/libs5-go/config"
|
"git.lumeweb.com/LumeWeb/libs5-go/config"
|
||||||
"git.lumeweb.com/LumeWeb/libs5-go/net"
|
"git.lumeweb.com/LumeWeb/libs5-go/net"
|
||||||
"git.lumeweb.com/LumeWeb/libs5-go/service"
|
|
||||||
"github.com/vmihailenco/msgpack/v5"
|
"github.com/vmihailenco/msgpack/v5"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"io"
|
"io"
|
||||||
|
@ -51,11 +50,11 @@ type IncomingMessageData struct {
|
||||||
Original []byte
|
Original []byte
|
||||||
Data []byte
|
Data []byte
|
||||||
Ctx context.Context
|
Ctx context.Context
|
||||||
Services service.Services
|
|
||||||
Logger *zap.Logger
|
Logger *zap.Logger
|
||||||
Peer net.Peer
|
Peer net.Peer
|
||||||
Config *config.NodeConfig
|
Config *config.NodeConfig
|
||||||
VerifyId bool
|
VerifyId bool
|
||||||
|
Mediator Mediator
|
||||||
}
|
}
|
||||||
|
|
||||||
type IncomingMessageReader struct {
|
type IncomingMessageReader struct {
|
||||||
|
|
|
@ -50,7 +50,5 @@ func (s *RegistryEntryRequest) DecodeMessage(dec *msgpack.Decoder, message Incom
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *RegistryEntryRequest) HandleMessage(message IncomingMessageData) error {
|
func (s *RegistryEntryRequest) HandleMessage(message IncomingMessageData) error {
|
||||||
peer := message.Peer
|
return message.Mediator.RegistrySet(s.sre, false, message.Peer)
|
||||||
services := message.Services
|
|
||||||
return services.Registry().Set(s.sre, false, peer)
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -50,9 +50,9 @@ func (s *RegistryQuery) DecodeMessage(dec *msgpack.Decoder, message IncomingMess
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *RegistryQuery) HandleMessage(message IncomingMessageData) error {
|
func (s *RegistryQuery) HandleMessage(message IncomingMessageData) error {
|
||||||
services := message.Services
|
mediator := message.Mediator
|
||||||
peer := message.Peer
|
peer := message.Peer
|
||||||
sre, err := services.Registry().Get(s.pk)
|
sre, err := mediator.RegistryGet(s.pk)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -105,10 +105,10 @@ func (a *AnnouncePeers) DecodeMessage(dec *msgpack.Decoder, message IncomingMess
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a AnnouncePeers) HandleMessage(message IncomingMessageDataSigned) error {
|
func (a AnnouncePeers) HandleMessage(message IncomingMessageDataSigned) error {
|
||||||
services := message.Services
|
mediator := message.Mediator
|
||||||
peer := message.Peer
|
peer := message.Peer
|
||||||
if len(a.connectionUris) > 0 {
|
if len(a.connectionUris) > 0 {
|
||||||
err := services.P2P().ConnectToNode([]*url.URL{a.connectionUris[0]}, false, peer)
|
err := mediator.ConnectToNode([]*url.URL{a.connectionUris[0]}, false, peer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,7 +4,6 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"git.lumeweb.com/LumeWeb/libs5-go/net"
|
|
||||||
"git.lumeweb.com/LumeWeb/libs5-go/types"
|
"git.lumeweb.com/LumeWeb/libs5-go/types"
|
||||||
"git.lumeweb.com/LumeWeb/libs5-go/utils"
|
"git.lumeweb.com/LumeWeb/libs5-go/utils"
|
||||||
"github.com/vmihailenco/msgpack/v5"
|
"github.com/vmihailenco/msgpack/v5"
|
||||||
|
@ -76,13 +75,13 @@ func NewHandshakeDone() *HandshakeDone {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h HandshakeDone) HandleMessage(message IncomingMessageDataSigned) error {
|
func (h HandshakeDone) HandleMessage(message IncomingMessageDataSigned) error {
|
||||||
services := message.Services
|
mediator := message.Mediator
|
||||||
peer := message.Peer
|
peer := message.Peer
|
||||||
verifyId := message.VerifyId
|
verifyId := message.VerifyId
|
||||||
nodeId := message.NodeId
|
nodeId := message.NodeId
|
||||||
logger := message.Logger
|
logger := message.Logger
|
||||||
|
|
||||||
if !services.IsStarted() {
|
if !mediator.ServicesStarted() {
|
||||||
err := peer.End()
|
err := peer.End()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil
|
return nil
|
||||||
|
@ -108,7 +107,7 @@ func (h HandshakeDone) HandleMessage(message IncomingMessageDataSigned) error {
|
||||||
if h.supportedFeatures != types.SupportedFeatures {
|
if h.supportedFeatures != types.SupportedFeatures {
|
||||||
return fmt.Errorf("Remote node does not support required features")
|
return fmt.Errorf("Remote node does not support required features")
|
||||||
}
|
}
|
||||||
err := services.P2P().AddPeer(peer)
|
err := mediator.AddPeer(peer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -123,7 +122,7 @@ func (h HandshakeDone) HandleMessage(message IncomingMessageDataSigned) error {
|
||||||
|
|
||||||
logger.Info(fmt.Sprintf("[+] %s (%s)", peerId, peer.RenderLocationURI()))
|
logger.Info(fmt.Sprintf("[+] %s (%s)", peerId, peer.RenderLocationURI()))
|
||||||
|
|
||||||
err = services.P2P().SendPublicPeersToPeer(peer, []net.Peer{peer})
|
err = mediator.ConnectToNode([]*url.URL{h.connectionUris[0]}, false, peer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,7 +39,7 @@ func (s *StorageLocation) DecodeMessage(dec *msgpack.Decoder, message IncomingMe
|
||||||
}
|
}
|
||||||
func (s *StorageLocation) HandleMessage(message IncomingMessageData) error {
|
func (s *StorageLocation) HandleMessage(message IncomingMessageData) error {
|
||||||
msg := message.Original
|
msg := message.Original
|
||||||
services := message.Services
|
mediator := message.Mediator
|
||||||
peer := message.Peer
|
peer := message.Peer
|
||||||
logger := message.Logger
|
logger := message.Logger
|
||||||
|
|
||||||
|
@ -79,9 +79,7 @@ func (s *StorageLocation) HandleMessage(message IncomingMessageData) error {
|
||||||
|
|
||||||
nodeId := encoding.NewNodeId(publicKey)
|
nodeId := encoding.NewNodeId(publicKey)
|
||||||
|
|
||||||
// Assuming `node` is an instance of your NodeImpl structure
|
err := mediator.AddStorageLocation(hash, nodeId, storage.NewStorageLocation(int(typeOfData), parts, int64(expiry)), msg)
|
||||||
err := services.Storage().AddStorageLocation(hash, nodeId, storage.NewStorageLocation(int(typeOfData), parts, int64(expiry)), msg) // Implement AddStorageLocation
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Failed to add storage location: %s", err)
|
return fmt.Errorf("Failed to add storage location: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -92,7 +90,7 @@ func (s *StorageLocation) HandleMessage(message IncomingMessageData) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
var list *hashset.Set
|
var list *hashset.Set
|
||||||
listVal, ok := services.P2P().HashQueryRoutingTable().Get(hashStr) // Implement HashQueryRoutingTable method
|
listVal, ok := mediator.HashQueryRoutingTable().Get(hashStr) // Implement HashQueryRoutingTable method
|
||||||
if !ok {
|
if !ok {
|
||||||
list = hashset.New()
|
list = hashset.New()
|
||||||
} else {
|
} else {
|
||||||
|
@ -109,7 +107,8 @@ func (s *StorageLocation) HandleMessage(message IncomingMessageData) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if peerVal, ok := services.P2P().Peers().Get(peerIdStr); ok {
|
|
||||||
|
if peerVal, ok := mediator.Peers().Get(peerIdStr); ok {
|
||||||
foundPeer := peerVal.(net.Peer)
|
foundPeer := peerVal.(net.Peer)
|
||||||
err := foundPeer.SendMessage(msg)
|
err := foundPeer.SendMessage(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -118,7 +117,7 @@ func (s *StorageLocation) HandleMessage(message IncomingMessageData) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
services.P2P().HashQueryRoutingTable().Remove(hash.HashCode())
|
mediator.HashQueryRoutingTable().Remove(hashStr)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -0,0 +1,88 @@
|
||||||
|
package _default
|
||||||
|
|
||||||
|
import (
|
||||||
|
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
|
||||||
|
"git.lumeweb.com/LumeWeb/libs5-go/net"
|
||||||
|
"git.lumeweb.com/LumeWeb/libs5-go/protocol"
|
||||||
|
"git.lumeweb.com/LumeWeb/libs5-go/service"
|
||||||
|
"git.lumeweb.com/LumeWeb/libs5-go/storage"
|
||||||
|
"git.lumeweb.com/LumeWeb/libs5-go/structs"
|
||||||
|
"git.lumeweb.com/LumeWeb/libs5-go/types"
|
||||||
|
"net/url"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ protocol.Mediator = (*MediatorDefault)(nil)
|
||||||
|
|
||||||
|
type MediatorDefault struct {
|
||||||
|
service.ServiceBase
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MediatorDefault) NetworkId() string {
|
||||||
|
return m.Services().P2P().NetworkId()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MediatorDefault) NodeId() *encoding.NodeId {
|
||||||
|
return m.Services().P2P().NodeId()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MediatorDefault) SelfConnectionUris() []*url.URL {
|
||||||
|
return m.Services().P2P().SelfConnectionUris()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MediatorDefault) SignMessageSimple(message []byte) ([]byte, error) {
|
||||||
|
return m.Services().P2P().SignMessageSimple(message)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MediatorDefault) GetCachedStorageLocations(hash *encoding.Multihash, kinds []types.StorageLocationType) (map[string]storage.StorageLocation, error) {
|
||||||
|
return m.Services().Storage().GetCachedStorageLocations(hash, kinds)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MediatorDefault) SortNodesByScore(nodes []*encoding.NodeId) ([]*encoding.NodeId, error) {
|
||||||
|
return m.Services().P2P().SortNodesByScore(nodes)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MediatorDefault) ProviderStore() storage.ProviderStore {
|
||||||
|
return m.Services().Storage().ProviderStore()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MediatorDefault) AddStorageLocation(hash *encoding.Multihash, nodeId *encoding.NodeId, location storage.StorageLocation, message []byte) error {
|
||||||
|
return m.Services().Storage().AddStorageLocation(hash, nodeId, location, message)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MediatorDefault) HashQueryRoutingTable() structs.Map {
|
||||||
|
return m.Services().P2P().HashQueryRoutingTable()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MediatorDefault) Peers() structs.Map {
|
||||||
|
return m.Services().P2P().Peers()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MediatorDefault) RegistrySet(sre protocol.SignedRegistryEntry, trusted bool, receivedFrom net.Peer) error {
|
||||||
|
return m.Services().Registry().Set(sre, trusted, receivedFrom)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MediatorDefault) RegistryGet(pk []byte) (protocol.SignedRegistryEntry, error) {
|
||||||
|
return m.Services().Registry().Get(pk)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MediatorDefault) ConnectToNode(connectionUris []*url.URL, retried bool, fromPeer net.Peer) error {
|
||||||
|
return m.Services().P2P().ConnectToNode(connectionUris, retried, fromPeer)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MediatorDefault) ServicesStarted() bool {
|
||||||
|
return m.Services().IsStarted()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MediatorDefault) AddPeer(peer net.Peer) error {
|
||||||
|
return m.Services().P2P().AddPeer(peer)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MediatorDefault) SendPublicPeersToPeer(peer net.Peer, peersToSend []net.Peer) error {
|
||||||
|
return m.Services().P2P().SendPublicPeersToPeer(peer, peersToSend)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMediator(params service.ServiceParams) *MediatorDefault {
|
||||||
|
return &MediatorDefault{
|
||||||
|
ServiceBase: service.NewServiceBase(params.Logger, params.Config, params.Db),
|
||||||
|
}
|
||||||
|
}
|
|
@ -439,6 +439,11 @@ func (p *P2PServiceDefault) OnNewPeerListen(peer net.Peer, verifyId bool) {
|
||||||
Peer: peer,
|
Peer: peer,
|
||||||
VerifyId: verifyId,
|
VerifyId: verifyId,
|
||||||
Config: p.Config(),
|
Config: p.Config(),
|
||||||
|
Mediator: NewMediator(service.ServiceParams{
|
||||||
|
Logger: p.Logger(),
|
||||||
|
Config: p.Config(),
|
||||||
|
Db: p.Db(),
|
||||||
|
}),
|
||||||
}
|
}
|
||||||
|
|
||||||
dec := msgpack.NewDecoder(bytes.NewReader(reader.Data))
|
dec := msgpack.NewDecoder(bytes.NewReader(reader.Data))
|
||||||
|
|
Loading…
Reference in New Issue