diff --git a/protocol/hash_query.go b/protocol/hash_query.go index f824027..c3aa80c 100644 --- a/protocol/hash_query.go +++ b/protocol/hash_query.go @@ -4,7 +4,10 @@ import ( libs5_go "git.lumeweb.com/LumeWeb/libs5-go" "git.lumeweb.com/LumeWeb/libs5-go/encoding" "git.lumeweb.com/LumeWeb/libs5-go/net" + "github.com/emirpasic/gods/sets/hashset" "github.com/vmihailenco/msgpack/v5" + "go.uber.org/zap" + "log" ) var _ IncomingMessageTyped = (*HashQuery)(nil) @@ -45,6 +48,64 @@ func (h *HashQuery) DecodeMessage(dec *msgpack.Decoder) error { return nil } func (h *HashQuery) HandleMessage(node *libs5_go.Node, peer *net.Peer, verifyId bool) error { + mapLocations, err := node.GetCachedStorageLocations(h.hash, h.kinds) + if err != nil { + log.Printf("Error getting cached storage locations: %v", err) + return err + } + + if len(mapLocations) > 0 { + availableNodes := make([]*encoding.NodeId, 0, len(mapLocations)) + for key := range mapLocations { + nodeId, err := encoding.DecodeNodeId(key) + if err != nil { + node.Logger().Error("Error decoding node id", zap.Error(err)) + continue + } + + availableNodes = append(availableNodes, nodeId) + } + + score, err := node.Services().P2P().SortNodesByScore(availableNodes) + if err != nil { + return err + } + + sortedNodeId, err := score[0].ToString() + if err != nil { + return err + } + + entry, exists := mapLocations[sortedNodeId] + if exists { + err := (*peer).SendMessage(entry.ProviderMessage) + if err != nil { + return err + } + } + } + + var peers *hashset.Set + peersVal, ok := node.HashQueryRoutingTable().Get(h.hash.HashCode()) // Implement HashQueryRoutingTable method + if !ok { + peers = hashset.New() + } + + peers = peersVal.(*hashset.Set) + + if exists := peers.Contains((*peer).GetId()); !exists { + peers.Add((*peer).GetId()) + } + + for _, val := range node.Services().P2P().Peers().Values() { + peerVal := val.(net.Peer) + if !peerVal.GetId().Equals((*peer).GetId()) { + err := peerVal.SendMessage(h.IncomingMessageImpl.Original()) + if err != nil { + node.Logger().Error("Failed to send message", zap.Error(err)) + } + } + } return nil } diff --git a/protocol/incoming_message.go b/protocol/incoming_message.go index d8519ab..b0b8bba 100644 --- a/protocol/incoming_message.go +++ b/protocol/incoming_message.go @@ -25,9 +25,18 @@ type IncomingMessageTyped interface { } type IncomingMessageImpl struct { - kind types.ProtocolMethod - data msgpack.RawMessage - known bool + kind types.ProtocolMethod + data msgpack.RawMessage + original []byte + known bool +} + +func (i *IncomingMessageImpl) SetOriginal(original []byte) { + i.original = original +} + +func (i *IncomingMessageImpl) Original() []byte { + return i.original } func (i *IncomingMessageImpl) SetIncomingMessage(msg IncomingMessage) {