libs5-go/protocol/hash_query.go

125 lines
2.6 KiB
Go
Raw Normal View History

2024-01-06 11:33:46 +00:00
package protocol
import (
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
2024-01-07 08:58:22 +00:00
"git.lumeweb.com/LumeWeb/libs5-go/interfaces"
2024-01-06 11:33:46 +00:00
"git.lumeweb.com/LumeWeb/libs5-go/net"
"git.lumeweb.com/LumeWeb/libs5-go/protocol/base"
"github.com/emirpasic/gods/sets/hashset"
2024-01-06 11:33:46 +00:00
"github.com/vmihailenco/msgpack/v5"
"go.uber.org/zap"
"log"
2024-01-06 11:33:46 +00:00
)
var _ base.IncomingMessageTyped = (*HashQuery)(nil)
2024-01-06 11:33:46 +00:00
type HashQuery struct {
hash *encoding.Multihash
kinds []int
base.IncomingMessageTypedImpl
base.IncomingMessageHandler
2024-01-06 11:33:46 +00:00
}
func NewHashQuery() *HashQuery {
return &HashQuery{}
}
2024-01-06 11:33:46 +00:00
func (h HashQuery) Hash() *encoding.Multihash {
return h.hash
}
func (h HashQuery) Kinds() []int {
return h.kinds
}
func (h *HashQuery) DecodeMessage(dec *msgpack.Decoder) error {
hash, err := dec.DecodeBytes()
if err != nil {
return err
}
h.hash = encoding.NewMultihash(hash)
var kinds []int
err = dec.Decode(&kinds)
if err != nil {
return err
}
h.kinds = kinds
return nil
}
func (h *HashQuery) HandleMessage(node interfaces.Node, peer net.Peer, verifyId bool) error {
mapLocations, err := node.GetCachedStorageLocations(h.hash, h.kinds)
if err != nil {
log.Printf("Error getting cached storage locations: %v", err)
return err
}
if len(mapLocations) > 0 {
availableNodes := make([]*encoding.NodeId, 0, len(mapLocations))
for key := range mapLocations {
nodeId, err := encoding.DecodeNodeId(key)
if err != nil {
node.Logger().Error("Error decoding node id", zap.Error(err))
continue
}
availableNodes = append(availableNodes, nodeId)
}
score, err := node.Services().P2P().SortNodesByScore(availableNodes)
if err != nil {
return err
}
2024-01-07 08:58:22 +00:00
sortedNodeId, err := (*score[0]).ToString()
if err != nil {
return err
}
entry, exists := mapLocations[sortedNodeId]
if exists {
err := peer.SendMessage(entry.ProviderMessage())
if err != nil {
return err
}
}
}
var peers *hashset.Set
hashString, err := h.hash.ToString()
if err != nil {
return err
}
peersVal, ok := node.HashQueryRoutingTable().Get(hashString) // Implement HashQueryRoutingTable method
if ok {
peers = peersVal.(*hashset.Set)
if !peers.Contains(peer.Id()) {
peers.Add(peer.Id())
}
return nil
}
peerList := hashset.New()
peerList.Add(peer.Id())
node.HashQueryRoutingTable().Put(hashString, peerList)
for _, val := range node.Services().P2P().Peers().Values() {
peerVal := val.(net.Peer)
if !peerVal.Id().Equals(peer.Id()) {
err := peerVal.SendMessage(h.IncomingMessageImpl.Original())
if err != nil {
node.Logger().Error("Failed to send message", zap.Error(err))
}
}
}
2024-01-06 11:33:46 +00:00
return nil
}