2024-01-06 11:33:46 +00:00
|
|
|
package protocol
|
|
|
|
|
|
|
|
import (
|
|
|
|
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
|
|
|
|
"git.lumeweb.com/LumeWeb/libs5-go/net"
|
2024-01-29 23:53:32 +00:00
|
|
|
"git.lumeweb.com/LumeWeb/libs5-go/storage"
|
2024-01-09 11:56:44 +00:00
|
|
|
"git.lumeweb.com/LumeWeb/libs5-go/types"
|
2024-01-06 15:54:38 +00:00
|
|
|
"github.com/emirpasic/gods/sets/hashset"
|
2024-01-06 11:33:46 +00:00
|
|
|
"github.com/vmihailenco/msgpack/v5"
|
2024-01-06 15:54:38 +00:00
|
|
|
"go.uber.org/zap"
|
|
|
|
"log"
|
2024-01-06 11:33:46 +00:00
|
|
|
)
|
|
|
|
|
2024-01-30 03:28:39 +00:00
|
|
|
var _ EncodeableMessage = (*HashQuery)(nil)
|
2024-01-30 03:25:21 +00:00
|
|
|
var _ IncomingMessage = (*HashQuery)(nil)
|
2024-01-06 11:33:46 +00:00
|
|
|
|
|
|
|
type HashQuery struct {
|
|
|
|
hash *encoding.Multihash
|
2024-01-09 11:56:44 +00:00
|
|
|
kinds []types.StorageLocationType
|
2024-01-30 03:25:21 +00:00
|
|
|
HandshakeRequirement
|
2024-01-06 11:33:46 +00:00
|
|
|
}
|
|
|
|
|
2024-01-08 17:41:38 +00:00
|
|
|
func NewHashQuery() *HashQuery {
|
2024-01-13 16:22:01 +00:00
|
|
|
hq := &HashQuery{}
|
|
|
|
|
|
|
|
hq.SetRequiresHandshake(true)
|
|
|
|
|
|
|
|
return hq
|
2024-01-08 17:41:38 +00:00
|
|
|
}
|
|
|
|
|
2024-01-09 11:57:06 +00:00
|
|
|
func NewHashRequest(hash *encoding.Multihash, kinds []types.StorageLocationType) *HashQuery {
|
|
|
|
if len(kinds) == 0 {
|
|
|
|
kinds = []types.StorageLocationType{types.StorageLocationTypeFile}
|
|
|
|
}
|
|
|
|
return &HashQuery{
|
|
|
|
hash: hash,
|
|
|
|
kinds: kinds,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-01-06 11:33:46 +00:00
|
|
|
func (h HashQuery) Hash() *encoding.Multihash {
|
|
|
|
return h.hash
|
|
|
|
}
|
|
|
|
|
2024-01-09 11:56:44 +00:00
|
|
|
func (h HashQuery) Kinds() []types.StorageLocationType {
|
2024-01-06 11:33:46 +00:00
|
|
|
return h.kinds
|
|
|
|
}
|
|
|
|
|
2024-01-30 03:25:21 +00:00
|
|
|
func (h *HashQuery) DecodeMessage(dec *msgpack.Decoder, message IncomingMessageData) error {
|
2024-01-06 11:33:46 +00:00
|
|
|
hash, err := dec.DecodeBytes()
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
h.hash = encoding.NewMultihash(hash)
|
|
|
|
|
2024-01-09 11:56:44 +00:00
|
|
|
var kinds []types.StorageLocationType
|
2024-01-06 11:33:46 +00:00
|
|
|
err = dec.Decode(&kinds)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
h.kinds = kinds
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
2024-01-09 12:07:00 +00:00
|
|
|
|
|
|
|
func (h HashQuery) EncodeMsgpack(enc *msgpack.Encoder) error {
|
|
|
|
err := enc.EncodeInt(int64(types.ProtocolMethodHashQuery))
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
err = enc.EncodeBytes(h.hash.FullBytes())
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
err = enc.Encode(h.kinds)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2024-01-30 03:25:21 +00:00
|
|
|
func (h *HashQuery) HandleMessage(message IncomingMessageData) error {
|
2024-01-29 04:39:40 +00:00
|
|
|
peer := message.Peer
|
2024-01-30 05:31:31 +00:00
|
|
|
mediator := message.Mediator
|
2024-01-29 06:55:36 +00:00
|
|
|
logger := message.Logger
|
2024-01-29 23:53:32 +00:00
|
|
|
config := message.Config
|
2024-01-29 04:39:40 +00:00
|
|
|
|
2024-01-30 05:31:31 +00:00
|
|
|
mapLocations, err := mediator.GetCachedStorageLocations(h.hash, h.kinds)
|
2024-01-06 15:54:38 +00:00
|
|
|
if err != nil {
|
|
|
|
log.Printf("Error getting cached storage locations: %v", err)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(mapLocations) > 0 {
|
|
|
|
availableNodes := make([]*encoding.NodeId, 0, len(mapLocations))
|
|
|
|
for key := range mapLocations {
|
|
|
|
nodeId, err := encoding.DecodeNodeId(key)
|
|
|
|
if err != nil {
|
2024-01-29 06:55:36 +00:00
|
|
|
logger.Error("Error decoding node id", zap.Error(err))
|
2024-01-06 15:54:38 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
availableNodes = append(availableNodes, nodeId)
|
|
|
|
}
|
|
|
|
|
2024-01-30 05:31:31 +00:00
|
|
|
score, err := mediator.SortNodesByScore(availableNodes)
|
2024-01-06 15:54:38 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2024-01-07 08:58:22 +00:00
|
|
|
sortedNodeId, err := (*score[0]).ToString()
|
2024-01-06 15:54:38 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
entry, exists := mapLocations[sortedNodeId]
|
|
|
|
if exists {
|
2024-01-07 11:35:41 +00:00
|
|
|
err := peer.SendMessage(entry.ProviderMessage())
|
2024-01-06 15:54:38 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-01-30 05:31:31 +00:00
|
|
|
if mediator.ProviderStore() != nil {
|
|
|
|
if mediator.ProviderStore().CanProvide(h.hash, h.kinds) {
|
|
|
|
location, err := mediator.ProviderStore().Provide(h.hash, h.kinds)
|
2024-01-24 08:01:59 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2024-01-29 23:53:32 +00:00
|
|
|
message := storage.PrepareProvideMessage(config.KeyPair, h.hash, location)
|
2024-01-24 08:01:59 +00:00
|
|
|
|
2024-01-30 05:31:31 +00:00
|
|
|
err = mediator.AddStorageLocation(h.hash, mediator.NodeId(), location, message)
|
2024-01-24 08:01:59 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
err = peer.SendMessage(message)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-01-06 15:54:38 +00:00
|
|
|
var peers *hashset.Set
|
2024-01-08 17:41:38 +00:00
|
|
|
hashString, err := h.hash.ToString()
|
2024-01-29 06:55:36 +00:00
|
|
|
logger.Debug("HashQuery", zap.Any("hashString", hashString))
|
2024-01-08 17:41:38 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
2024-01-06 15:54:38 +00:00
|
|
|
}
|
2024-01-30 05:31:31 +00:00
|
|
|
peersVal, ok := mediator.HashQueryRoutingTable().Get(hashString)
|
2024-01-08 17:41:38 +00:00
|
|
|
if ok {
|
|
|
|
peers = peersVal.(*hashset.Set)
|
|
|
|
if !peers.Contains(peer.Id()) {
|
|
|
|
peers.Add(peer.Id())
|
|
|
|
}
|
2024-01-06 15:54:38 +00:00
|
|
|
|
2024-01-08 17:41:38 +00:00
|
|
|
return nil
|
2024-01-06 15:54:38 +00:00
|
|
|
}
|
|
|
|
|
2024-01-08 17:41:38 +00:00
|
|
|
peerList := hashset.New()
|
|
|
|
peerList.Add(peer.Id())
|
|
|
|
|
2024-01-30 05:31:31 +00:00
|
|
|
mediator.HashQueryRoutingTable().Put(hashString, peerList)
|
2024-01-08 17:41:38 +00:00
|
|
|
|
2024-01-30 05:31:31 +00:00
|
|
|
for _, val := range mediator.Peers().Values() {
|
2024-01-06 15:54:38 +00:00
|
|
|
peerVal := val.(net.Peer)
|
2024-01-07 11:35:41 +00:00
|
|
|
if !peerVal.Id().Equals(peer.Id()) {
|
2024-01-29 04:39:40 +00:00
|
|
|
err := peerVal.SendMessage(message.Original)
|
2024-01-06 15:54:38 +00:00
|
|
|
if err != nil {
|
2024-01-29 06:55:36 +00:00
|
|
|
logger.Error("Failed to send message", zap.Error(err))
|
2024-01-06 15:54:38 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2024-01-06 11:33:46 +00:00
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|