From 4959270f51d6d788cc3cfa4482e023ba087b8499 Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Sat, 6 Jan 2024 09:45:00 -0500 Subject: [PATCH] feat: wip initial storage location support --- node.go | 124 ++++++++++++----------------------- protocol/storage_location.go | 109 ++++++++++++++++++++++++++++++ storage.go | 112 +++++++++++++++++++++++++++++++ 3 files changed, 262 insertions(+), 83 deletions(-) create mode 100644 protocol/storage_location.go create mode 100644 storage.go diff --git a/node.go b/node.go index dcae551..3a3a1d3 100644 --- a/node.go +++ b/node.go @@ -58,45 +58,46 @@ func (n *Node) Db() *bolt.DB { } /* -func (n *Node) Services() *S5Services { - if n.nodeConfig != nil { - return n.nodeConfig.Services + func (n *Node) Services() *S5Services { + if n.nodeConfig != nil { + return n.nodeConfig.Services + } + return nil } - return nil -} + func (n *Node) Start() error { + n.started = true + return nil + } -func (n *Node) Start() error { - n.started = true - return nil -} + func (n *Node) Stop() error { + n.started = false + return nil + } +*/ +func (n *Node) GetCachedStorageLocations(hash *encoding.Multihash, types []int) (map[encoding.NodeIdCode]*StorageLocation, error) { + locations := make(map[encoding.NodeIdCode]*StorageLocation) -func (n *Node) Stop() error { - n.started = false - return nil -} -func (n *Node) GetCachedStorageLocations(hash Multihash, types []int) (map[NodeId]*StorageLocation, error) { - locations := make(map[NodeId]*StorageLocation) - - mapFromDB, err := n.readStorageLocationsFromDB(hash) + locationMap, err := n.readStorageLocationsFromDB(hash) if err != nil { return nil, err } - if len(mapFromDB) == 0 { - return make(map[NodeId]*StorageLocation), nil + if len(locationMap) == 0 { + return make(map[encoding.NodeIdCode]*StorageLocation), nil } ts := time.Now().Unix() for _, t := range types { - nodeMap, ok := mapFromDB[t] + + nodeMap, ok := (locationMap)[t] if !ok { continue } for key, value := range nodeMap { if len(value) < 4 { - continue // or handle error + continue } expiry, ok := value[3].(int64) @@ -106,12 +107,12 @@ func (n *Node) GetCachedStorageLocations(hash Multihash, types []int) (map[NodeI addresses, ok := value[1].([]string) if !ok { - continue // or handle error + continue } storageLocation := NewStorageLocation(t, addresses, expiry) if len(value) > 4 { - if providerMessage, ok := value[4].(string); ok { + if providerMessage, ok := value[4].([]byte); ok { storageLocation.ProviderMessage = providerMessage } } @@ -119,103 +120,60 @@ func (n *Node) GetCachedStorageLocations(hash Multihash, types []int) (map[NodeI locations[NodeId(key)] = storageLocation } } - return locations, nil } -func (n *Node) ReadStorageLocationsFromDB(hash Multihash) (map[int]map[NodeId]map[int]interface{}, error) { - locations := make(map[int]map[NodeId]map[int]interface{}) +func (n *Node) readStorageLocationsFromDB(hash *encoding.Multihash) (storageLocationMap, error) { + locationMap := newStorageLocationMap() - bytes, err := n.config.CacheDb.Get(StringifyHash(hash)) // Assume StringifyHash and CacheDb.Get are implemented - if err != nil { - return locations, nil - } + bytes := n.cacheBucket.Get(hash.FullBytes()) if bytes == nil { - return locations, nil + return locationMap, nil } - unpacker := NewUnpacker(bytes) // Assume NewUnpacker is implemented to handle the unpacking - mapLength, err := unpacker.UnpackMapLength() + err := msgpack.Unmarshal(bytes, locationMap) if err != nil { return nil, err } - for i := 0; i < mapLength; i++ { - t, err := unpacker.UnpackInt() - if err != nil { - continue // or handle error - } - - innerMap := make(map[NodeId]map[int]interface{}) - locations[t] = innerMap - - innerMapLength, err := unpacker.UnpackMapLength() - if err != nil { - continue // or handle error - } - - for j := 0; j < innerMapLength; j++ { - nodeIdBytes, err := unpacker.UnpackBinary() - if err != nil { - continue // or handle error - } - nodeId := NodeId(nodeIdBytes) - - // Assuming unpacker.UnpackMap() returns a map[string]interface{} and is implemented - unpackedMap, err := unpacker.UnpackMap() - if err != nil { - continue // or handle error - } - - convertedMap := make(map[int]interface{}) - for key, value := range unpackedMap { - intKey, err := strconv.Atoi(key) - if err != nil { - continue // or handle error - } - convertedMap[intKey] = value - } - innerMap[nodeId] = convertedMap - } - } - return locations, nil + return locationMap, nil } -func (n *Node) AddStorageLocation(hash Multihash, nodeId NodeId, location StorageLocation, message []byte, config S5Config) error { +func (n *Node) AddStorageLocation(hash *encoding.Multihash, nodeId *encoding.NodeId, location *StorageLocation, message []byte, config *NodeConfig) error { // Read existing storage locations - mapFromDB, err := n.ReadStorageLocationsFromDB(hash) + locationDb, err := n.readStorageLocationsFromDB(hash) if err != nil { return err } // Get or create the inner map for the specific type - innerMap, exists := mapFromDB[location.Type] + innerMap, exists := locationDb[location.Type] if !exists { - innerMap = make(map[NodeId]map[int]interface{}) - mapFromDB[location.Type] = innerMap + innerMap = make(nodeStorage, 1) + innerMap[nodeId.HashCode()] = make(nodeDetailsStorage, 1) } // Create location map with new data - locationMap := make(map[int]interface{}) + locationMap := make(map[int]interface{}, 3) locationMap[1] = location.Parts - // locationMap[2] = location.BinaryParts // Uncomment if BinaryParts is a field of StorageLocation locationMap[3] = location.Expiry locationMap[4] = message // Update the inner map with the new location - innerMap[nodeId] = locationMap + innerMap[nodeId.HashCode()] = locationMap + locationDb[location.Type] = innerMap // Serialize the updated map and store it in the database - packedBytes, err := NewPacker().Pack(mapFromDB) // Assuming NewPacker and Pack are implemented + packedBytes, err := msgpack.Marshal(locationDb) if err != nil { return err } - err = config.CacheDb.Put(StringifyHash(hash), packedBytes) // Assume CacheDb.Put and StringifyHash are implemented + err = n.cacheBucket.Put(hash.FullBytes(), packedBytes) if err != nil { return err } return nil -} +} /* func (n *Node) DownloadBytesByHash(hash Multihash) ([]byte, error) { dlUriProvider := NewStorageLocationProvider(n, hash, []int{storageLocationTypeFull, storageLocationTypeFile}) diff --git a/protocol/storage_location.go b/protocol/storage_location.go new file mode 100644 index 0000000..1c4bd87 --- /dev/null +++ b/protocol/storage_location.go @@ -0,0 +1,109 @@ +package protocol + +import ( + "crypto/ed25519" + "fmt" + libs5_go "git.lumeweb.com/LumeWeb/libs5-go" + "git.lumeweb.com/LumeWeb/libs5-go/encoding" + "git.lumeweb.com/LumeWeb/libs5-go/net" + "git.lumeweb.com/LumeWeb/libs5-go/types" + "git.lumeweb.com/LumeWeb/libs5-go/utils" + "github.com/emirpasic/gods/sets/hashset" + "github.com/vmihailenco/msgpack/v5" + "go.uber.org/zap" +) + +var _ IncomingMessageTyped = (*StorageLocation)(nil) + +type StorageLocation struct { + raw []byte + hash *encoding.Multihash + kind int + expiry int64 + parts []string + publicKey []byte + signature []byte + + IncomingMessageTypedImpl + IncomingMessageHandler +} + +func (s *StorageLocation) DecodeMessage(dec *msgpack.Decoder) error { + data, err := dec.DecodeRaw() + + if err != nil { + return err + } + + s.raw = data + + return nil +} +func (s *StorageLocation) HandleMessage(node *libs5_go.Node, peer *net.Peer, verifyId bool) error { + hash := encoding.NewMultihash(s.raw[1:34]) // Replace NewMultihash with appropriate function + fmt.Println("Hash:", hash) + + typeOfData := s.raw[34] + + expiry := utils.DecodeEndian(s.raw[35:39]) + + partCount := s.raw[39] + + parts := []string{} + cursor := 40 + for i := 0; i < int(partCount); i++ { + length := utils.DecodeEndian(s.raw[cursor : cursor+2]) + cursor += 2 + part := string(s.raw[cursor : cursor+int(length)]) + parts = append(parts, part) + cursor += int(length) + } + + publicKey := s.raw[cursor : cursor+33] + signature := s.raw[cursor+33:] + + if types.HashType(publicKey[0]) != types.HashTypeEd25519 { // Replace CID_HASH_TYPES_ED25519 with actual constant + return fmt.Errorf("Unsupported public key type %d", publicKey[0]) + } + + if !ed25519.Verify(publicKey[1:], s.raw[:cursor], signature) { + return fmt.Errorf("Signature verification failed") + } + + nodeId := encoding.NewNodeId(publicKey) + + // Assuming `node` is an instance of your Node structure + err := node.AddStorageLocation(hash, nodeId, libs5_go.NewStorageLocation(int(typeOfData), parts, int64(expiry)), s.raw, node.Config()) // Implement AddStorageLocation + + if err != nil { + return fmt.Errorf("Failed to add storage location: %s", err) + } + + var list *hashset.Set + listVal, ok := node.HashQueryRoutingTable().Get(hash.HashCode()) // Implement HashQueryRoutingTable method + if !ok { + list = hashset.New() + } + + list = listVal.(*hashset.Set) + + for _, peerIdVal := range list.Values() { + peerId := peerIdVal.(*encoding.NodeId) + + if peerId.Equals(nodeId) || peerId.Equals(peer) { + continue + } + if peerVal, ok := node.Services().P2P().Peers().Get(peerId.HashCode()); ok { + foundPeer := peerVal.(net.Peer) + err := foundPeer.SendMessage(s.raw) + if err != nil { + node.Logger().Error("Failed to send message", zap.Error(err)) + continue + } + } + + node.HashQueryRoutingTable().Remove(hash.HashCode()) + } + + return nil +} diff --git a/storage.go b/storage.go new file mode 100644 index 0000000..44c44d6 --- /dev/null +++ b/storage.go @@ -0,0 +1,112 @@ +package libs5_go + +import ( + "fmt" + "git.lumeweb.com/LumeWeb/libs5-go/encoding" + "github.com/vmihailenco/msgpack/v5" + "strconv" + "time" +) + +var ( + _ msgpack.CustomDecoder = (*storageLocationMap)(nil) +) + +type StorageLocation struct { + Type int + Parts []string + BinaryParts [][]byte + Expiry int64 + ProviderMessage []byte +} + +func NewStorageLocation(Type int, Parts []string, Expiry int64) *StorageLocation { + return &StorageLocation{ + Type: Type, + Parts: Parts, + Expiry: Expiry, + } +} + +func (s *StorageLocation) BytesURL() string { + return s.Parts[0] +} + +func (s *StorageLocation) OutboardBytesURL() string { + if len(s.Parts) == 1 { + return s.Parts[0] + ".obao" + } + return s.Parts[1] +} + +func (s *StorageLocation) String() string { + expiryDate := time.Unix(s.Expiry, 0) + return "StorageLocation(" + strconv.Itoa(s.Type) + ", " + fmt.Sprint(s.Parts) + ", expiry: " + expiryDate.Format(time.RFC3339) + ")" +} + +type SignedStorageLocation struct { + NodeID encoding.NodeId + Location StorageLocation +} + +func NewSignedStorageLocation(NodeID encoding.NodeId, Location StorageLocation) *SignedStorageLocation { + return &SignedStorageLocation{ + NodeID: NodeID, + Location: Location, + } +} + +func (ssl *SignedStorageLocation) String() string { + nodeString, _ := ssl.NodeID.ToString() + + if nodeString == "" { + nodeString = "failed to decode node id" + } + + return "SignedStorageLocation(" + ssl.Location.String() + ", " + nodeString + ")" +} + +type storageLocationMap map[int]nodeStorage +type nodeStorage map[encoding.NodeIdCode]nodeDetailsStorage +type nodeDetailsStorage map[int]interface{} + +func (s *storageLocationMap) DecodeMsgpack(dec *msgpack.Decoder) error { + temp, err := dec.DecodeUntypedMap() + if err != nil { + return err + } + + if *s == nil { + *s = make(map[int]nodeStorage) + } + + tempMap, ok := interface{}(temp).(storageLocationMap) + if !ok { + return fmt.Errorf("unexpected data format from msgpack decoding") + } + + *s = tempMap + + return nil +} + +func (s storageLocationMap) EncodeMsgpack(enc *msgpack.Encoder) error { + // Create a temporary map to hold the encoded data + tempMap := make(map[int]map[encoding.NodeIdCode]map[int]interface{}) + + // Populate the temporary map with data from storageLocationMap + for storageKey, nodeStorages := range s { + tempNodeStorages := make(map[encoding.NodeIdCode]map[int]interface{}) + for nodeId, nodeDetails := range nodeStorages { + tempNodeStorages[nodeId] = nodeDetails + } + tempMap[storageKey] = tempNodeStorages + } + + // Encode the temporary map using MessagePack + return enc.Encode(tempMap) +} + +func newStorageLocationMap() storageLocationMap { + return storageLocationMap{} +}