libs5-go/protocol/hash_query.go

182 lines
3.7 KiB
Go

package protocol
import (
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
"git.lumeweb.com/LumeWeb/libs5-go/net"
"git.lumeweb.com/LumeWeb/libs5-go/storage"
"git.lumeweb.com/LumeWeb/libs5-go/structs"
"git.lumeweb.com/LumeWeb/libs5-go/types"
"github.com/vmihailenco/msgpack/v5"
"go.uber.org/zap"
"log"
)
var _ EncodeableMessage = (*HashQuery)(nil)
var _ IncomingMessage = (*HashQuery)(nil)
type HashQuery struct {
hash *encoding.Multihash
kinds []types.StorageLocationType
HandshakeRequirement
}
func NewHashQuery() *HashQuery {
hq := &HashQuery{}
hq.SetRequiresHandshake(true)
return hq
}
func NewHashRequest(hash *encoding.Multihash, kinds []types.StorageLocationType) *HashQuery {
if len(kinds) == 0 {
kinds = []types.StorageLocationType{types.StorageLocationTypeFile}
}
return &HashQuery{
hash: hash,
kinds: kinds,
}
}
func (h HashQuery) Hash() *encoding.Multihash {
return h.hash
}
func (h HashQuery) Kinds() []types.StorageLocationType {
return h.kinds
}
func (h *HashQuery) DecodeMessage(dec *msgpack.Decoder, message IncomingMessageData) error {
hash, err := dec.DecodeBytes()
if err != nil {
return err
}
h.hash = encoding.NewMultihash(hash)
var kinds []types.StorageLocationType
err = dec.Decode(&kinds)
if err != nil {
return err
}
h.kinds = kinds
return nil
}
func (h HashQuery) EncodeMsgpack(enc *msgpack.Encoder) error {
err := enc.EncodeInt(int64(types.ProtocolMethodHashQuery))
if err != nil {
return err
}
err = enc.EncodeBytes(h.hash.FullBytes())
if err != nil {
return err
}
err = enc.Encode(h.kinds)
if err != nil {
return err
}
return nil
}
func (h *HashQuery) HandleMessage(message IncomingMessageData) error {
peer := message.Peer
mediator := message.Mediator
logger := message.Logger
config := message.Config
mapLocations, err := mediator.GetCachedStorageLocations(h.hash, h.kinds)
if err != nil {
log.Printf("Error getting cached storage locations: %v", err)
return err
}
if len(mapLocations) > 0 {
availableNodes := make([]*encoding.NodeId, 0, len(mapLocations))
for key := range mapLocations {
nodeId, err := encoding.DecodeNodeId(key)
if err != nil {
logger.Error("Error decoding node id", zap.Error(err))
continue
}
availableNodes = append(availableNodes, nodeId)
}
score, err := mediator.SortNodesByScore(availableNodes)
if err != nil {
return err
}
sortedNodeId, err := (*score[0]).ToString()
if err != nil {
return err
}
entry, exists := mapLocations[sortedNodeId]
if exists {
err := peer.SendMessage(entry.ProviderMessage())
if err != nil {
return err
}
}
}
if mediator.ProviderStore() != nil {
if mediator.ProviderStore().CanProvide(h.hash, h.kinds) {
location, err := mediator.ProviderStore().Provide(h.hash, h.kinds)
if err != nil {
return err
}
message := storage.PrepareProvideMessage(config.KeyPair, h.hash, location)
err = peer.SendMessage(message)
if err != nil {
return err
}
}
}
var peers *structs.SetImpl
hashString, err := h.hash.ToString()
logger.Debug("HashQuery", zap.Any("hashString", hashString))
if err != nil {
return err
}
peersVal, ok := mediator.HashQueryRoutingTable().Get(hashString)
if ok {
peers = peersVal.(*structs.SetImpl)
if !peers.Contains(peer.Id()) {
peers.Add(peer.Id())
}
return nil
}
peerList := structs.NewSet()
peerList.Add(peer.Id())
mediator.HashQueryRoutingTable().Put(hashString, peerList)
for _, val := range mediator.Peers().Values() {
peerVal := val.(net.Peer)
if !peerVal.Id().Equals(peer.Id()) {
err := peerVal.SendMessage(message.Original)
if err != nil {
logger.Error("Failed to send message", zap.Error(err))
}
}
}
return nil
}