2024-01-06 18:21:09 +00:00
|
|
|
package node
|
2024-01-06 11:33:46 +00:00
|
|
|
|
|
|
|
import (
|
2024-01-06 18:26:03 +00:00
|
|
|
"git.lumeweb.com/LumeWeb/libs5-go/config"
|
2024-01-06 14:46:01 +00:00
|
|
|
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
|
2024-01-07 08:13:35 +00:00
|
|
|
"git.lumeweb.com/LumeWeb/libs5-go/interfaces"
|
2024-01-06 11:33:46 +00:00
|
|
|
"git.lumeweb.com/LumeWeb/libs5-go/structs"
|
2024-01-06 14:46:01 +00:00
|
|
|
"git.lumeweb.com/LumeWeb/libs5-go/utils"
|
|
|
|
"github.com/vmihailenco/msgpack/v5"
|
2024-01-06 11:33:46 +00:00
|
|
|
bolt "go.etcd.io/bbolt"
|
|
|
|
"go.uber.org/zap"
|
2024-01-06 14:46:01 +00:00
|
|
|
"time"
|
2024-01-06 11:33:46 +00:00
|
|
|
)
|
|
|
|
|
2024-01-07 08:13:35 +00:00
|
|
|
var _ interfaces.Node = (*NodeImpl)(nil)
|
2024-01-06 14:46:01 +00:00
|
|
|
|
|
|
|
const cacheBucketName = "object-cache"
|
|
|
|
|
2024-01-07 08:13:35 +00:00
|
|
|
type NodeImpl struct {
|
2024-01-06 18:26:03 +00:00
|
|
|
nodeConfig *config.NodeConfig
|
2024-01-06 11:33:46 +00:00
|
|
|
metadataCache *structs.Map
|
|
|
|
started bool
|
|
|
|
hashQueryRoutingTable *structs.Map
|
2024-01-07 08:13:35 +00:00
|
|
|
services interfaces.Services
|
2024-01-06 14:46:01 +00:00
|
|
|
cacheBucket *bolt.Bucket
|
|
|
|
}
|
|
|
|
|
2024-01-07 08:13:35 +00:00
|
|
|
func (n *NodeImpl) Services() *interfaces.Services {
|
2024-01-06 14:46:01 +00:00
|
|
|
return &n.services
|
2024-01-06 11:33:46 +00:00
|
|
|
}
|
|
|
|
|
2024-01-07 08:13:35 +00:00
|
|
|
func NewNode(config *config.NodeConfig) *NodeImpl {
|
|
|
|
return &NodeImpl{
|
2024-01-06 11:33:46 +00:00
|
|
|
nodeConfig: config,
|
|
|
|
metadataCache: structs.NewMap(),
|
|
|
|
started: false,
|
|
|
|
hashQueryRoutingTable: structs.NewMap(),
|
|
|
|
}
|
|
|
|
}
|
2024-01-07 08:13:35 +00:00
|
|
|
func (n *NodeImpl) HashQueryRoutingTable() *structs.Map {
|
2024-01-06 11:33:46 +00:00
|
|
|
return n.hashQueryRoutingTable
|
|
|
|
}
|
|
|
|
|
2024-01-07 08:13:35 +00:00
|
|
|
func (n *NodeImpl) IsStarted() bool {
|
2024-01-06 11:33:46 +00:00
|
|
|
return n.started
|
|
|
|
}
|
|
|
|
|
2024-01-07 08:13:35 +00:00
|
|
|
func (n *NodeImpl) Config() *config.NodeConfig {
|
2024-01-06 11:33:46 +00:00
|
|
|
return n.nodeConfig
|
|
|
|
}
|
|
|
|
|
2024-01-07 08:13:35 +00:00
|
|
|
func (n *NodeImpl) Logger() *zap.Logger {
|
2024-01-06 11:33:46 +00:00
|
|
|
if n.nodeConfig != nil {
|
|
|
|
return n.nodeConfig.Logger
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2024-01-07 08:13:35 +00:00
|
|
|
func (n *NodeImpl) Db() *bolt.DB {
|
2024-01-06 11:33:46 +00:00
|
|
|
if n.nodeConfig != nil {
|
2024-01-06 18:15:45 +00:00
|
|
|
return n.nodeConfig.DB
|
2024-01-06 11:33:46 +00:00
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2024-01-07 08:13:35 +00:00
|
|
|
func (n *NodeImpl) Start() error {
|
2024-01-06 14:46:01 +00:00
|
|
|
err :=
|
|
|
|
utils.CreateBucket(cacheBucketName, n.Db(), func(bucket *bolt.Bucket) {
|
|
|
|
n.cacheBucket = bucket
|
|
|
|
})
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
n.started = true
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2024-01-06 11:33:46 +00:00
|
|
|
/*
|
2024-01-07 08:13:35 +00:00
|
|
|
func (n *NodeImpl) Services() *S5Services {
|
2024-01-06 14:45:00 +00:00
|
|
|
if n.nodeConfig != nil {
|
|
|
|
return n.nodeConfig.Services
|
|
|
|
}
|
|
|
|
return nil
|
2024-01-06 11:33:46 +00:00
|
|
|
}
|
|
|
|
|
2024-01-07 08:13:35 +00:00
|
|
|
func (n *NodeImpl) Start() error {
|
2024-01-06 14:45:00 +00:00
|
|
|
n.started = true
|
|
|
|
return nil
|
|
|
|
}
|
2024-01-06 11:33:46 +00:00
|
|
|
|
2024-01-07 08:13:35 +00:00
|
|
|
func (n *NodeImpl) Stop() error {
|
2024-01-06 14:45:00 +00:00
|
|
|
n.started = false
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
*/
|
2024-01-07 08:13:35 +00:00
|
|
|
func (n *NodeImpl) GetCachedStorageLocations(hash *encoding.Multihash, types []int) (map[string]*interfaces.StorageLocation, error) {
|
|
|
|
locations := make(map[string]*interfaces.StorageLocation)
|
2024-01-06 11:33:46 +00:00
|
|
|
|
2024-01-06 14:45:00 +00:00
|
|
|
locationMap, err := n.readStorageLocationsFromDB(hash)
|
2024-01-06 11:33:46 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2024-01-06 14:45:00 +00:00
|
|
|
if len(locationMap) == 0 {
|
2024-01-07 08:13:35 +00:00
|
|
|
return make(map[string]*interfaces.StorageLocation), nil
|
2024-01-06 11:33:46 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
ts := time.Now().Unix()
|
|
|
|
|
|
|
|
for _, t := range types {
|
2024-01-06 14:45:00 +00:00
|
|
|
|
|
|
|
nodeMap, ok := (locationMap)[t]
|
2024-01-06 11:33:46 +00:00
|
|
|
if !ok {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
for key, value := range nodeMap {
|
|
|
|
if len(value) < 4 {
|
2024-01-06 14:45:00 +00:00
|
|
|
continue
|
2024-01-06 11:33:46 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
expiry, ok := value[3].(int64)
|
|
|
|
if !ok || expiry < ts {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
addresses, ok := value[1].([]string)
|
|
|
|
if !ok {
|
2024-01-06 14:45:00 +00:00
|
|
|
continue
|
2024-01-06 11:33:46 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
storageLocation := NewStorageLocation(t, addresses, expiry)
|
|
|
|
if len(value) > 4 {
|
2024-01-06 14:45:00 +00:00
|
|
|
if providerMessage, ok := value[4].([]byte); ok {
|
2024-01-07 08:13:35 +00:00
|
|
|
(*storageLocation).SetProviderMessage(providerMessage)
|
2024-01-06 11:33:46 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-01-06 14:46:01 +00:00
|
|
|
locations[key] = storageLocation
|
2024-01-06 11:33:46 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
return locations, nil
|
|
|
|
}
|
2024-01-07 08:13:35 +00:00
|
|
|
func (n *NodeImpl) readStorageLocationsFromDB(hash *encoding.Multihash) (storageLocationMap, error) {
|
2024-01-06 18:26:03 +00:00
|
|
|
locationMap := newStorageLocationMap()
|
2024-01-06 11:33:46 +00:00
|
|
|
|
2024-01-06 14:45:00 +00:00
|
|
|
bytes := n.cacheBucket.Get(hash.FullBytes())
|
2024-01-06 11:33:46 +00:00
|
|
|
if bytes == nil {
|
2024-01-06 14:45:00 +00:00
|
|
|
return locationMap, nil
|
2024-01-06 11:33:46 +00:00
|
|
|
}
|
|
|
|
|
2024-01-06 14:45:00 +00:00
|
|
|
err := msgpack.Unmarshal(bytes, locationMap)
|
2024-01-06 11:33:46 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2024-01-06 14:45:00 +00:00
|
|
|
return locationMap, nil
|
2024-01-06 11:33:46 +00:00
|
|
|
}
|
2024-01-07 08:13:35 +00:00
|
|
|
func (n *NodeImpl) AddStorageLocation(hash *encoding.Multihash, nodeId *encoding.NodeId, location *interfaces.StorageLocation, message []byte, config *config.NodeConfig) error {
|
2024-01-06 11:33:46 +00:00
|
|
|
// Read existing storage locations
|
2024-01-06 14:45:00 +00:00
|
|
|
locationDb, err := n.readStorageLocationsFromDB(hash)
|
2024-01-06 11:33:46 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2024-01-06 15:53:20 +00:00
|
|
|
nodeIdStr, err := nodeId.ToString()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2024-01-06 11:33:46 +00:00
|
|
|
// Get or create the inner map for the specific type
|
2024-01-07 08:13:35 +00:00
|
|
|
innerMap, exists := locationDb[(*location).Type()]
|
2024-01-06 11:33:46 +00:00
|
|
|
if !exists {
|
2024-01-06 18:26:03 +00:00
|
|
|
innerMap = make(nodeStorage, 1)
|
|
|
|
innerMap[nodeIdStr] = make(nodeDetailsStorage, 1)
|
2024-01-06 11:33:46 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Create location map with new data
|
2024-01-06 14:45:00 +00:00
|
|
|
locationMap := make(map[int]interface{}, 3)
|
2024-01-07 08:13:35 +00:00
|
|
|
locationMap[1] = (*location).Parts
|
|
|
|
locationMap[3] = (*location).Expiry
|
2024-01-06 11:33:46 +00:00
|
|
|
locationMap[4] = message
|
|
|
|
|
|
|
|
// Update the inner map with the new location
|
2024-01-06 15:53:20 +00:00
|
|
|
innerMap[nodeIdStr] = locationMap
|
2024-01-07 08:13:35 +00:00
|
|
|
locationDb[(*location).Type()] = innerMap
|
2024-01-06 11:33:46 +00:00
|
|
|
|
|
|
|
// Serialize the updated map and store it in the database
|
2024-01-06 14:45:00 +00:00
|
|
|
packedBytes, err := msgpack.Marshal(locationDb)
|
2024-01-06 11:33:46 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2024-01-06 14:45:00 +00:00
|
|
|
err = n.cacheBucket.Put(hash.FullBytes(), packedBytes)
|
2024-01-06 11:33:46 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
2024-01-06 14:45:00 +00:00
|
|
|
} /*
|
2024-01-06 11:33:46 +00:00
|
|
|
|
2024-01-07 08:13:35 +00:00
|
|
|
func (n *NodeImpl) DownloadBytesByHash(hash Multihash) ([]byte, error) {
|
2024-01-06 11:33:46 +00:00
|
|
|
dlUriProvider := NewStorageLocationProvider(n, hash, []int{storageLocationTypeFull, storageLocationTypeFile})
|
|
|
|
dlUriProvider.Start()
|
|
|
|
|
|
|
|
retryCount := 0
|
|
|
|
for {
|
|
|
|
dlUri, err := dlUriProvider.Next()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
n.Logger.Verbose(fmt.Sprintf("[try] %s", dlUri.Location.BytesUrl))
|
|
|
|
|
|
|
|
client := &http.Client{
|
|
|
|
Timeout: 30 * time.Second,
|
|
|
|
}
|
|
|
|
res, err := client.Get(dlUri.Location.BytesUrl)
|
|
|
|
if err != nil {
|
|
|
|
n.Logger.Catched(err)
|
|
|
|
|
|
|
|
dlUriProvider.Downvote(dlUri)
|
|
|
|
|
|
|
|
retryCount++
|
|
|
|
if retryCount > 32 {
|
|
|
|
return nil, errors.New("too many retries")
|
|
|
|
}
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
defer res.Body.Close()
|
|
|
|
|
|
|
|
data, err := ioutil.ReadAll(res.Body)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Assuming blake3 and equalBytes functions are available
|
|
|
|
resHash := blake3(data)
|
|
|
|
|
|
|
|
if !equalBytes(hash.HashBytes, resHash) {
|
|
|
|
dlUriProvider.Downvote(dlUri)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
dlUriProvider.Upvote(dlUri)
|
|
|
|
return data, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-01-07 08:13:35 +00:00
|
|
|
func (n *NodeImpl) GetMetadataByCID(cid CID) (Metadata, error) {
|
2024-01-06 11:33:46 +00:00
|
|
|
var metadata Metadata
|
|
|
|
var ok bool
|
|
|
|
|
|
|
|
if metadata, ok = n.MetadataCache[cid.Hash]; !ok {
|
|
|
|
bytes, err := n.DownloadBytesByHash(cid.Hash)
|
|
|
|
if err != nil {
|
|
|
|
return Metadata{}, err
|
|
|
|
}
|
|
|
|
|
2024-01-07 08:13:35 +00:00
|
|
|
switch cid.kind {
|
2024-01-06 11:33:46 +00:00
|
|
|
case METADATA_MEDIA, BRIDGE: // Both cases use the same deserialization method
|
|
|
|
metadata, err = deserializeMediaMetadata(bytes)
|
|
|
|
case METADATA_WEBAPP:
|
|
|
|
metadata, err = deserializeWebAppMetadata(bytes)
|
|
|
|
default:
|
|
|
|
return Metadata{}, errors.New("unsupported metadata format")
|
|
|
|
}
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return Metadata{}, err
|
|
|
|
}
|
|
|
|
|
|
|
|
n.MetadataCache[cid.Hash] = metadata
|
|
|
|
}
|
|
|
|
|
|
|
|
return metadata, nil
|
|
|
|
}
|
|
|
|
*/
|