libs5-go/protocol/storage_location.go

125 lines
2.9 KiB
Go
Raw Normal View History

package protocol
import (
"crypto/ed25519"
"fmt"
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
"git.lumeweb.com/LumeWeb/libs5-go/net"
"git.lumeweb.com/LumeWeb/libs5-go/storage"
"git.lumeweb.com/LumeWeb/libs5-go/types"
"git.lumeweb.com/LumeWeb/libs5-go/utils"
"github.com/emirpasic/gods/sets/hashset"
"github.com/vmihailenco/msgpack/v5"
"go.uber.org/zap"
)
2024-01-30 03:25:21 +00:00
var _ IncomingMessage = (*StorageLocation)(nil)
type StorageLocation struct {
hash *encoding.Multihash
kind int
expiry int64
parts []string
publicKey []byte
signature []byte
2024-01-30 03:25:21 +00:00
HandshakeRequirement
}
func NewStorageLocation() *StorageLocation {
sl := &StorageLocation{}
sl.SetRequiresHandshake(true)
return sl
}
2024-01-30 03:25:21 +00:00
func (s *StorageLocation) DecodeMessage(dec *msgpack.Decoder, message IncomingMessageData) error {
// nop, we use the incoming message -> original already stored
return nil
}
2024-01-30 03:25:21 +00:00
func (s *StorageLocation) HandleMessage(message IncomingMessageData) error {
msg := message.Original
mediator := message.Mediator
peer := message.Peer
logger := message.Logger
hash := encoding.NewMultihash(msg[1:34]) // Replace NewMultihash with appropriate function
typeOfData := msg[34]
expiry := utils.DecodeEndian(msg[35:39])
partCount := msg[39]
parts := []string{}
cursor := 40
for i := 0; i < int(partCount); i++ {
length := utils.DecodeEndian(msg[cursor : cursor+2])
cursor += 2
if len(msg) < cursor+int(length) {
return fmt.Errorf("Invalid message")
}
part := string(msg[cursor : cursor+int(length)])
parts = append(parts, part)
cursor += int(length)
}
cursor++
publicKey := msg[cursor : cursor+33]
signature := msg[cursor+33:]
if types.HashType(publicKey[0]) != types.HashTypeEd25519 { // Replace CID_HASH_TYPES_ED25519 with actual constant
return fmt.Errorf("Unsupported public key type %d", publicKey[0])
}
if !ed25519.Verify(publicKey[1:], msg[:cursor], signature) {
return fmt.Errorf("Signature verification failed")
}
nodeId := encoding.NewNodeId(publicKey)
err := mediator.AddStorageLocation(hash, nodeId, storage.NewStorageLocation(int(typeOfData), parts, int64(expiry)), msg)
if err != nil {
return fmt.Errorf("Failed to add storage location: %s", err)
}
hashStr, err := hash.ToString()
if err != nil {
return err
}
var list *hashset.Set
listVal, ok := mediator.HashQueryRoutingTable().Get(hashStr) // Implement HashQueryRoutingTable method
if !ok {
list = hashset.New()
} else {
list = listVal.(*hashset.Set)
}
for _, peerIdVal := range list.Values() {
peerId := peerIdVal.(*encoding.NodeId)
if peerId.Equals(nodeId) || peerId.Equals(peer) {
continue
}
peerIdStr, err := peerId.ToString()
if err != nil {
return err
}
if peerVal, ok := mediator.Peers().Get(peerIdStr); ok {
foundPeer := peerVal.(net.Peer)
err := foundPeer.SendMessage(msg)
if err != nil {
logger.Error("Failed to send message", zap.Error(err))
continue
}
}
mediator.HashQueryRoutingTable().Remove(hashStr)
}
return nil
}