feat: implement HashQuery HandleMessage
This commit is contained in:
parent
eefbfa06d0
commit
16ce7338bd
|
@ -4,7 +4,10 @@ import (
|
||||||
libs5_go "git.lumeweb.com/LumeWeb/libs5-go"
|
libs5_go "git.lumeweb.com/LumeWeb/libs5-go"
|
||||||
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
|
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
|
||||||
"git.lumeweb.com/LumeWeb/libs5-go/net"
|
"git.lumeweb.com/LumeWeb/libs5-go/net"
|
||||||
|
"github.com/emirpasic/gods/sets/hashset"
|
||||||
"github.com/vmihailenco/msgpack/v5"
|
"github.com/vmihailenco/msgpack/v5"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
"log"
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ IncomingMessageTyped = (*HashQuery)(nil)
|
var _ IncomingMessageTyped = (*HashQuery)(nil)
|
||||||
|
@ -45,6 +48,64 @@ func (h *HashQuery) DecodeMessage(dec *msgpack.Decoder) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
func (h *HashQuery) HandleMessage(node *libs5_go.Node, peer *net.Peer, verifyId bool) error {
|
func (h *HashQuery) HandleMessage(node *libs5_go.Node, peer *net.Peer, verifyId bool) error {
|
||||||
|
mapLocations, err := node.GetCachedStorageLocations(h.hash, h.kinds)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Error getting cached storage locations: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(mapLocations) > 0 {
|
||||||
|
availableNodes := make([]*encoding.NodeId, 0, len(mapLocations))
|
||||||
|
for key := range mapLocations {
|
||||||
|
nodeId, err := encoding.DecodeNodeId(key)
|
||||||
|
if err != nil {
|
||||||
|
node.Logger().Error("Error decoding node id", zap.Error(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
availableNodes = append(availableNodes, nodeId)
|
||||||
|
}
|
||||||
|
|
||||||
|
score, err := node.Services().P2P().SortNodesByScore(availableNodes)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
sortedNodeId, err := score[0].ToString()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
entry, exists := mapLocations[sortedNodeId]
|
||||||
|
if exists {
|
||||||
|
err := (*peer).SendMessage(entry.ProviderMessage)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var peers *hashset.Set
|
||||||
|
peersVal, ok := node.HashQueryRoutingTable().Get(h.hash.HashCode()) // Implement HashQueryRoutingTable method
|
||||||
|
if !ok {
|
||||||
|
peers = hashset.New()
|
||||||
|
}
|
||||||
|
|
||||||
|
peers = peersVal.(*hashset.Set)
|
||||||
|
|
||||||
|
if exists := peers.Contains((*peer).GetId()); !exists {
|
||||||
|
peers.Add((*peer).GetId())
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, val := range node.Services().P2P().Peers().Values() {
|
||||||
|
peerVal := val.(net.Peer)
|
||||||
|
if !peerVal.GetId().Equals((*peer).GetId()) {
|
||||||
|
err := peerVal.SendMessage(h.IncomingMessageImpl.Original())
|
||||||
|
if err != nil {
|
||||||
|
node.Logger().Error("Failed to send message", zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,9 +25,18 @@ type IncomingMessageTyped interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
type IncomingMessageImpl struct {
|
type IncomingMessageImpl struct {
|
||||||
kind types.ProtocolMethod
|
kind types.ProtocolMethod
|
||||||
data msgpack.RawMessage
|
data msgpack.RawMessage
|
||||||
known bool
|
original []byte
|
||||||
|
known bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *IncomingMessageImpl) SetOriginal(original []byte) {
|
||||||
|
i.original = original
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *IncomingMessageImpl) Original() []byte {
|
||||||
|
return i.original
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *IncomingMessageImpl) SetIncomingMessage(msg IncomingMessage) {
|
func (i *IncomingMessageImpl) SetIncomingMessage(msg IncomingMessage) {
|
||||||
|
|
Loading…
Reference in New Issue