feat: wip initial storage location support
This commit is contained in:
parent
f45e297791
commit
4959270f51
124
node.go
124
node.go
|
@ -58,45 +58,46 @@ func (n *Node) Db() *bolt.DB {
|
|||
}
|
||||
|
||||
/*
|
||||
func (n *Node) Services() *S5Services {
|
||||
if n.nodeConfig != nil {
|
||||
return n.nodeConfig.Services
|
||||
func (n *Node) Services() *S5Services {
|
||||
if n.nodeConfig != nil {
|
||||
return n.nodeConfig.Services
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *Node) Start() error {
|
||||
n.started = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *Node) Start() error {
|
||||
n.started = true
|
||||
return nil
|
||||
}
|
||||
func (n *Node) Stop() error {
|
||||
n.started = false
|
||||
return nil
|
||||
}
|
||||
*/
|
||||
func (n *Node) GetCachedStorageLocations(hash *encoding.Multihash, types []int) (map[encoding.NodeIdCode]*StorageLocation, error) {
|
||||
locations := make(map[encoding.NodeIdCode]*StorageLocation)
|
||||
|
||||
func (n *Node) Stop() error {
|
||||
n.started = false
|
||||
return nil
|
||||
}
|
||||
func (n *Node) GetCachedStorageLocations(hash Multihash, types []int) (map[NodeId]*StorageLocation, error) {
|
||||
locations := make(map[NodeId]*StorageLocation)
|
||||
|
||||
mapFromDB, err := n.readStorageLocationsFromDB(hash)
|
||||
locationMap, err := n.readStorageLocationsFromDB(hash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(mapFromDB) == 0 {
|
||||
return make(map[NodeId]*StorageLocation), nil
|
||||
if len(locationMap) == 0 {
|
||||
return make(map[encoding.NodeIdCode]*StorageLocation), nil
|
||||
}
|
||||
|
||||
ts := time.Now().Unix()
|
||||
|
||||
for _, t := range types {
|
||||
nodeMap, ok := mapFromDB[t]
|
||||
|
||||
nodeMap, ok := (locationMap)[t]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
for key, value := range nodeMap {
|
||||
if len(value) < 4 {
|
||||
continue // or handle error
|
||||
continue
|
||||
}
|
||||
|
||||
expiry, ok := value[3].(int64)
|
||||
|
@ -106,12 +107,12 @@ func (n *Node) GetCachedStorageLocations(hash Multihash, types []int) (map[NodeI
|
|||
|
||||
addresses, ok := value[1].([]string)
|
||||
if !ok {
|
||||
continue // or handle error
|
||||
continue
|
||||
}
|
||||
|
||||
storageLocation := NewStorageLocation(t, addresses, expiry)
|
||||
if len(value) > 4 {
|
||||
if providerMessage, ok := value[4].(string); ok {
|
||||
if providerMessage, ok := value[4].([]byte); ok {
|
||||
storageLocation.ProviderMessage = providerMessage
|
||||
}
|
||||
}
|
||||
|
@ -119,103 +120,60 @@ func (n *Node) GetCachedStorageLocations(hash Multihash, types []int) (map[NodeI
|
|||
locations[NodeId(key)] = storageLocation
|
||||
}
|
||||
}
|
||||
|
||||
return locations, nil
|
||||
}
|
||||
func (n *Node) ReadStorageLocationsFromDB(hash Multihash) (map[int]map[NodeId]map[int]interface{}, error) {
|
||||
locations := make(map[int]map[NodeId]map[int]interface{})
|
||||
func (n *Node) readStorageLocationsFromDB(hash *encoding.Multihash) (storageLocationMap, error) {
|
||||
locationMap := newStorageLocationMap()
|
||||
|
||||
bytes, err := n.config.CacheDb.Get(StringifyHash(hash)) // Assume StringifyHash and CacheDb.Get are implemented
|
||||
if err != nil {
|
||||
return locations, nil
|
||||
}
|
||||
bytes := n.cacheBucket.Get(hash.FullBytes())
|
||||
if bytes == nil {
|
||||
return locations, nil
|
||||
return locationMap, nil
|
||||
}
|
||||
|
||||
unpacker := NewUnpacker(bytes) // Assume NewUnpacker is implemented to handle the unpacking
|
||||
mapLength, err := unpacker.UnpackMapLength()
|
||||
err := msgpack.Unmarshal(bytes, locationMap)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for i := 0; i < mapLength; i++ {
|
||||
t, err := unpacker.UnpackInt()
|
||||
if err != nil {
|
||||
continue // or handle error
|
||||
}
|
||||
|
||||
innerMap := make(map[NodeId]map[int]interface{})
|
||||
locations[t] = innerMap
|
||||
|
||||
innerMapLength, err := unpacker.UnpackMapLength()
|
||||
if err != nil {
|
||||
continue // or handle error
|
||||
}
|
||||
|
||||
for j := 0; j < innerMapLength; j++ {
|
||||
nodeIdBytes, err := unpacker.UnpackBinary()
|
||||
if err != nil {
|
||||
continue // or handle error
|
||||
}
|
||||
nodeId := NodeId(nodeIdBytes)
|
||||
|
||||
// Assuming unpacker.UnpackMap() returns a map[string]interface{} and is implemented
|
||||
unpackedMap, err := unpacker.UnpackMap()
|
||||
if err != nil {
|
||||
continue // or handle error
|
||||
}
|
||||
|
||||
convertedMap := make(map[int]interface{})
|
||||
for key, value := range unpackedMap {
|
||||
intKey, err := strconv.Atoi(key)
|
||||
if err != nil {
|
||||
continue // or handle error
|
||||
}
|
||||
convertedMap[intKey] = value
|
||||
}
|
||||
innerMap[nodeId] = convertedMap
|
||||
}
|
||||
}
|
||||
return locations, nil
|
||||
return locationMap, nil
|
||||
}
|
||||
func (n *Node) AddStorageLocation(hash Multihash, nodeId NodeId, location StorageLocation, message []byte, config S5Config) error {
|
||||
func (n *Node) AddStorageLocation(hash *encoding.Multihash, nodeId *encoding.NodeId, location *StorageLocation, message []byte, config *NodeConfig) error {
|
||||
// Read existing storage locations
|
||||
mapFromDB, err := n.ReadStorageLocationsFromDB(hash)
|
||||
locationDb, err := n.readStorageLocationsFromDB(hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Get or create the inner map for the specific type
|
||||
innerMap, exists := mapFromDB[location.Type]
|
||||
innerMap, exists := locationDb[location.Type]
|
||||
if !exists {
|
||||
innerMap = make(map[NodeId]map[int]interface{})
|
||||
mapFromDB[location.Type] = innerMap
|
||||
innerMap = make(nodeStorage, 1)
|
||||
innerMap[nodeId.HashCode()] = make(nodeDetailsStorage, 1)
|
||||
}
|
||||
|
||||
// Create location map with new data
|
||||
locationMap := make(map[int]interface{})
|
||||
locationMap := make(map[int]interface{}, 3)
|
||||
locationMap[1] = location.Parts
|
||||
// locationMap[2] = location.BinaryParts // Uncomment if BinaryParts is a field of StorageLocation
|
||||
locationMap[3] = location.Expiry
|
||||
locationMap[4] = message
|
||||
|
||||
// Update the inner map with the new location
|
||||
innerMap[nodeId] = locationMap
|
||||
innerMap[nodeId.HashCode()] = locationMap
|
||||
locationDb[location.Type] = innerMap
|
||||
|
||||
// Serialize the updated map and store it in the database
|
||||
packedBytes, err := NewPacker().Pack(mapFromDB) // Assuming NewPacker and Pack are implemented
|
||||
packedBytes, err := msgpack.Marshal(locationDb)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = config.CacheDb.Put(StringifyHash(hash), packedBytes) // Assume CacheDb.Put and StringifyHash are implemented
|
||||
err = n.cacheBucket.Put(hash.FullBytes(), packedBytes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
} /*
|
||||
|
||||
func (n *Node) DownloadBytesByHash(hash Multihash) ([]byte, error) {
|
||||
dlUriProvider := NewStorageLocationProvider(n, hash, []int{storageLocationTypeFull, storageLocationTypeFile})
|
||||
|
|
|
@ -0,0 +1,109 @@
|
|||
package protocol
|
||||
|
||||
import (
|
||||
"crypto/ed25519"
|
||||
"fmt"
|
||||
libs5_go "git.lumeweb.com/LumeWeb/libs5-go"
|
||||
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
|
||||
"git.lumeweb.com/LumeWeb/libs5-go/net"
|
||||
"git.lumeweb.com/LumeWeb/libs5-go/types"
|
||||
"git.lumeweb.com/LumeWeb/libs5-go/utils"
|
||||
"github.com/emirpasic/gods/sets/hashset"
|
||||
"github.com/vmihailenco/msgpack/v5"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var _ IncomingMessageTyped = (*StorageLocation)(nil)
|
||||
|
||||
type StorageLocation struct {
|
||||
raw []byte
|
||||
hash *encoding.Multihash
|
||||
kind int
|
||||
expiry int64
|
||||
parts []string
|
||||
publicKey []byte
|
||||
signature []byte
|
||||
|
||||
IncomingMessageTypedImpl
|
||||
IncomingMessageHandler
|
||||
}
|
||||
|
||||
func (s *StorageLocation) DecodeMessage(dec *msgpack.Decoder) error {
|
||||
data, err := dec.DecodeRaw()
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.raw = data
|
||||
|
||||
return nil
|
||||
}
|
||||
func (s *StorageLocation) HandleMessage(node *libs5_go.Node, peer *net.Peer, verifyId bool) error {
|
||||
hash := encoding.NewMultihash(s.raw[1:34]) // Replace NewMultihash with appropriate function
|
||||
fmt.Println("Hash:", hash)
|
||||
|
||||
typeOfData := s.raw[34]
|
||||
|
||||
expiry := utils.DecodeEndian(s.raw[35:39])
|
||||
|
||||
partCount := s.raw[39]
|
||||
|
||||
parts := []string{}
|
||||
cursor := 40
|
||||
for i := 0; i < int(partCount); i++ {
|
||||
length := utils.DecodeEndian(s.raw[cursor : cursor+2])
|
||||
cursor += 2
|
||||
part := string(s.raw[cursor : cursor+int(length)])
|
||||
parts = append(parts, part)
|
||||
cursor += int(length)
|
||||
}
|
||||
|
||||
publicKey := s.raw[cursor : cursor+33]
|
||||
signature := s.raw[cursor+33:]
|
||||
|
||||
if types.HashType(publicKey[0]) != types.HashTypeEd25519 { // Replace CID_HASH_TYPES_ED25519 with actual constant
|
||||
return fmt.Errorf("Unsupported public key type %d", publicKey[0])
|
||||
}
|
||||
|
||||
if !ed25519.Verify(publicKey[1:], s.raw[:cursor], signature) {
|
||||
return fmt.Errorf("Signature verification failed")
|
||||
}
|
||||
|
||||
nodeId := encoding.NewNodeId(publicKey)
|
||||
|
||||
// Assuming `node` is an instance of your Node structure
|
||||
err := node.AddStorageLocation(hash, nodeId, libs5_go.NewStorageLocation(int(typeOfData), parts, int64(expiry)), s.raw, node.Config()) // Implement AddStorageLocation
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to add storage location: %s", err)
|
||||
}
|
||||
|
||||
var list *hashset.Set
|
||||
listVal, ok := node.HashQueryRoutingTable().Get(hash.HashCode()) // Implement HashQueryRoutingTable method
|
||||
if !ok {
|
||||
list = hashset.New()
|
||||
}
|
||||
|
||||
list = listVal.(*hashset.Set)
|
||||
|
||||
for _, peerIdVal := range list.Values() {
|
||||
peerId := peerIdVal.(*encoding.NodeId)
|
||||
|
||||
if peerId.Equals(nodeId) || peerId.Equals(peer) {
|
||||
continue
|
||||
}
|
||||
if peerVal, ok := node.Services().P2P().Peers().Get(peerId.HashCode()); ok {
|
||||
foundPeer := peerVal.(net.Peer)
|
||||
err := foundPeer.SendMessage(s.raw)
|
||||
if err != nil {
|
||||
node.Logger().Error("Failed to send message", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
node.HashQueryRoutingTable().Remove(hash.HashCode())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,112 @@
|
|||
package libs5_go
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
|
||||
"github.com/vmihailenco/msgpack/v5"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
_ msgpack.CustomDecoder = (*storageLocationMap)(nil)
|
||||
)
|
||||
|
||||
type StorageLocation struct {
|
||||
Type int
|
||||
Parts []string
|
||||
BinaryParts [][]byte
|
||||
Expiry int64
|
||||
ProviderMessage []byte
|
||||
}
|
||||
|
||||
func NewStorageLocation(Type int, Parts []string, Expiry int64) *StorageLocation {
|
||||
return &StorageLocation{
|
||||
Type: Type,
|
||||
Parts: Parts,
|
||||
Expiry: Expiry,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *StorageLocation) BytesURL() string {
|
||||
return s.Parts[0]
|
||||
}
|
||||
|
||||
func (s *StorageLocation) OutboardBytesURL() string {
|
||||
if len(s.Parts) == 1 {
|
||||
return s.Parts[0] + ".obao"
|
||||
}
|
||||
return s.Parts[1]
|
||||
}
|
||||
|
||||
func (s *StorageLocation) String() string {
|
||||
expiryDate := time.Unix(s.Expiry, 0)
|
||||
return "StorageLocation(" + strconv.Itoa(s.Type) + ", " + fmt.Sprint(s.Parts) + ", expiry: " + expiryDate.Format(time.RFC3339) + ")"
|
||||
}
|
||||
|
||||
type SignedStorageLocation struct {
|
||||
NodeID encoding.NodeId
|
||||
Location StorageLocation
|
||||
}
|
||||
|
||||
func NewSignedStorageLocation(NodeID encoding.NodeId, Location StorageLocation) *SignedStorageLocation {
|
||||
return &SignedStorageLocation{
|
||||
NodeID: NodeID,
|
||||
Location: Location,
|
||||
}
|
||||
}
|
||||
|
||||
func (ssl *SignedStorageLocation) String() string {
|
||||
nodeString, _ := ssl.NodeID.ToString()
|
||||
|
||||
if nodeString == "" {
|
||||
nodeString = "failed to decode node id"
|
||||
}
|
||||
|
||||
return "SignedStorageLocation(" + ssl.Location.String() + ", " + nodeString + ")"
|
||||
}
|
||||
|
||||
type storageLocationMap map[int]nodeStorage
|
||||
type nodeStorage map[encoding.NodeIdCode]nodeDetailsStorage
|
||||
type nodeDetailsStorage map[int]interface{}
|
||||
|
||||
func (s *storageLocationMap) DecodeMsgpack(dec *msgpack.Decoder) error {
|
||||
temp, err := dec.DecodeUntypedMap()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if *s == nil {
|
||||
*s = make(map[int]nodeStorage)
|
||||
}
|
||||
|
||||
tempMap, ok := interface{}(temp).(storageLocationMap)
|
||||
if !ok {
|
||||
return fmt.Errorf("unexpected data format from msgpack decoding")
|
||||
}
|
||||
|
||||
*s = tempMap
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s storageLocationMap) EncodeMsgpack(enc *msgpack.Encoder) error {
|
||||
// Create a temporary map to hold the encoded data
|
||||
tempMap := make(map[int]map[encoding.NodeIdCode]map[int]interface{})
|
||||
|
||||
// Populate the temporary map with data from storageLocationMap
|
||||
for storageKey, nodeStorages := range s {
|
||||
tempNodeStorages := make(map[encoding.NodeIdCode]map[int]interface{})
|
||||
for nodeId, nodeDetails := range nodeStorages {
|
||||
tempNodeStorages[nodeId] = nodeDetails
|
||||
}
|
||||
tempMap[storageKey] = tempNodeStorages
|
||||
}
|
||||
|
||||
// Encode the temporary map using MessagePack
|
||||
return enc.Encode(tempMap)
|
||||
}
|
||||
|
||||
func newStorageLocationMap() storageLocationMap {
|
||||
return storageLocationMap{}
|
||||
}
|
Loading…
Reference in New Issue