libs5-go/node/node.go

296 lines
6.4 KiB
Go
Raw Normal View History

package node
2024-01-06 11:33:46 +00:00
import (
"errors"
"git.lumeweb.com/LumeWeb/libs5-go/config"
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
"git.lumeweb.com/LumeWeb/libs5-go/interfaces"
"git.lumeweb.com/LumeWeb/libs5-go/metadata"
"git.lumeweb.com/LumeWeb/libs5-go/service"
"git.lumeweb.com/LumeWeb/libs5-go/storage"
2024-01-06 11:33:46 +00:00
"git.lumeweb.com/LumeWeb/libs5-go/structs"
"git.lumeweb.com/LumeWeb/libs5-go/types"
"git.lumeweb.com/LumeWeb/libs5-go/utils"
"github.com/go-resty/resty/v2"
"github.com/vmihailenco/msgpack/v5"
2024-01-06 11:33:46 +00:00
bolt "go.etcd.io/bbolt"
"go.uber.org/zap"
"log"
"sync"
"time"
2024-01-06 11:33:46 +00:00
)
var _ interfaces.Node = (*NodeImpl)(nil)
const cacheBucketName = "object-cache"
type NodeImpl struct {
nodeConfig *config.NodeConfig
metadataCache structs.Map
2024-01-06 11:33:46 +00:00
started bool
hashQueryRoutingTable structs.Map
services interfaces.Services
cacheBucket *bolt.Bucket
httpClient *resty.Client
connections sync.WaitGroup
}
2024-01-07 14:02:39 +00:00
func (n *NodeImpl) NetworkId() string {
return n.nodeConfig.P2P.Network
}
func (n *NodeImpl) Services() interfaces.Services {
return n.services
2024-01-06 11:33:46 +00:00
}
func NewNode(config *config.NodeConfig) interfaces.Node {
n := &NodeImpl{
2024-01-06 11:33:46 +00:00
nodeConfig: config,
metadataCache: structs.NewMap(),
started: false,
hashQueryRoutingTable: structs.NewMap(),
httpClient: resty.New(),
2024-01-06 11:33:46 +00:00
}
n.services = NewServices(service.NewP2P(n))
return n
2024-01-06 11:33:46 +00:00
}
func (n *NodeImpl) HashQueryRoutingTable() structs.Map {
2024-01-06 11:33:46 +00:00
return n.hashQueryRoutingTable
}
func (n *NodeImpl) IsStarted() bool {
2024-01-06 11:33:46 +00:00
return n.started
}
func (n *NodeImpl) Config() *config.NodeConfig {
2024-01-06 11:33:46 +00:00
return n.nodeConfig
}
func (n *NodeImpl) Logger() *zap.Logger {
2024-01-06 11:33:46 +00:00
if n.nodeConfig != nil {
return n.nodeConfig.Logger
}
return nil
}
func (n *NodeImpl) Db() *bolt.DB {
2024-01-06 11:33:46 +00:00
if n.nodeConfig != nil {
2024-01-06 18:15:45 +00:00
return n.nodeConfig.DB
2024-01-06 11:33:46 +00:00
}
return nil
}
func (n *NodeImpl) Start() error {
err :=
utils.CreateBucket(cacheBucketName, n.Db(), func(bucket *bolt.Bucket) {
n.cacheBucket = bucket
})
if err != nil {
return err
}
2024-01-07 10:25:45 +00:00
n.started = true
err = n.Services().P2P().Init()
if err != nil {
return err
}
err = n.Services().P2P().Start()
if err != nil {
return err
}
n.started = true
return nil
}
func (n *NodeImpl) GetCachedStorageLocations(hash *encoding.Multihash, kinds []types.StorageLocationType) (map[string]interfaces.StorageLocation, error) {
locations := make(map[string]interfaces.StorageLocation)
2024-01-06 11:33:46 +00:00
locationMap, err := n.readStorageLocationsFromDB(hash)
2024-01-06 11:33:46 +00:00
if err != nil {
return nil, err
}
if len(locationMap) == 0 {
return make(map[string]interfaces.StorageLocation), nil
2024-01-06 11:33:46 +00:00
}
ts := time.Now().Unix()
for _, t := range kinds {
nodeMap, ok := (locationMap)[int(t)]
2024-01-06 11:33:46 +00:00
if !ok {
continue
}
for key, value := range nodeMap {
if len(value) < 4 {
continue
2024-01-06 11:33:46 +00:00
}
expiry, ok := value[3].(int64)
if !ok || expiry < ts {
continue
}
addresses, ok := value[1].([]string)
if !ok {
continue
2024-01-06 11:33:46 +00:00
}
storageLocation := storage.NewStorageLocation(int(t), addresses, expiry)
2024-01-06 11:33:46 +00:00
if len(value) > 4 {
if providerMessage, ok := value[4].([]byte); ok {
(storageLocation).SetProviderMessage(providerMessage)
2024-01-06 11:33:46 +00:00
}
}
locations[key] = storageLocation
2024-01-06 11:33:46 +00:00
}
}
return locations, nil
}
func (n *NodeImpl) readStorageLocationsFromDB(hash *encoding.Multihash) (storage.StorageLocationMap, error) {
locationMap := storage.NewStorageLocationMap()
2024-01-06 11:33:46 +00:00
bytes := n.cacheBucket.Get(hash.FullBytes())
2024-01-06 11:33:46 +00:00
if bytes == nil {
return locationMap, nil
2024-01-06 11:33:46 +00:00
}
err := msgpack.Unmarshal(bytes, locationMap)
2024-01-06 11:33:46 +00:00
if err != nil {
return nil, err
}
return locationMap, nil
2024-01-06 11:33:46 +00:00
}
func (n *NodeImpl) AddStorageLocation(hash *encoding.Multihash, nodeId *encoding.NodeId, location interfaces.StorageLocation, message []byte, config *config.NodeConfig) error {
2024-01-06 11:33:46 +00:00
// Read existing storage locations
locationDb, err := n.readStorageLocationsFromDB(hash)
2024-01-06 11:33:46 +00:00
if err != nil {
return err
}
nodeIdStr, err := nodeId.ToString()
if err != nil {
return err
}
2024-01-06 11:33:46 +00:00
// Get or create the inner map for the specific type
innerMap, exists := locationDb[location.Type()]
2024-01-06 11:33:46 +00:00
if !exists {
innerMap = make(storage.NodeStorage, 1)
innerMap[nodeIdStr] = make(storage.NodeDetailsStorage, 1)
2024-01-06 11:33:46 +00:00
}
// Create location map with new data
locationMap := make(map[int]interface{}, 3)
locationMap[1] = location.Parts()
locationMap[3] = location.Expiry()
2024-01-06 11:33:46 +00:00
locationMap[4] = message
// Update the inner map with the new location
innerMap[nodeIdStr] = locationMap
locationDb[location.Type()] = innerMap
2024-01-06 11:33:46 +00:00
// Serialize the updated map and store it in the database
packedBytes, err := msgpack.Marshal(locationDb)
2024-01-06 11:33:46 +00:00
if err != nil {
return err
}
err = n.cacheBucket.Put(hash.FullBytes(), packedBytes)
2024-01-06 11:33:46 +00:00
if err != nil {
return err
}
return nil
}
2024-01-06 11:33:46 +00:00
func (n *NodeImpl) DownloadBytesByHash(hash *encoding.Multihash) ([]byte, error) {
// Initialize the download URI provider
dlUriProvider := storage.NewStorageLocationProvider(n, hash)
err := dlUriProvider.Start()
if err != nil {
return nil, err
}
2024-01-06 11:33:46 +00:00
retryCount := 0
for {
dlUri, err := dlUriProvider.Next()
if err != nil {
return nil, err
}
// Log the attempt
log.Printf("[try] %s", dlUri.Location().BytesURL())
2024-01-06 11:33:46 +00:00
res, err := n.httpClient.R().Get(dlUri.Location().BytesURL())
2024-01-06 11:33:46 +00:00
if err != nil {
err := dlUriProvider.Downvote(dlUri)
if err != nil {
return nil, err
}
2024-01-06 11:33:46 +00:00
retryCount++
if retryCount > 32 {
return nil, errors.New("too many retries")
}
continue
}
bodyBytes := res.Body()
2024-01-06 11:33:46 +00:00
return bodyBytes, nil
2024-01-06 11:33:46 +00:00
}
}
func (n *NodeImpl) GetMetadataByCID(cid *encoding.CID) (metadata.Metadata, error) {
var md metadata.Metadata
hashStr, err := cid.Hash.ToString()
if err != nil {
return nil, err
}
2024-01-06 11:33:46 +00:00
if n.metadataCache.Contains(hashStr) {
bytes, err := n.DownloadBytesByHash(&cid.Hash)
2024-01-06 11:33:46 +00:00
if err != nil {
return nil, err
2024-01-06 11:33:46 +00:00
}
switch cid.Type {
case types.CIDTypeMetadataMedia, types.CIDTypeBridge: // Both cases use the same deserialization method
md = metadata.NewEmptyMediaMetadata()
2024-01-06 11:33:46 +00:00
err = msgpack.Unmarshal(bytes, md)
if err != nil {
return nil, err
}
case types.CIDTypeMetadataWebapp:
md = metadata.NewEmptyWebAppMetadata()
err = msgpack.Unmarshal(bytes, md)
if err != nil {
return nil, err
}
default:
return nil, errors.New("unsupported metadata format")
2024-01-06 11:33:46 +00:00
}
n.metadataCache.Put(hashStr, md)
2024-01-06 11:33:46 +00:00
}
return md, nil
2024-01-06 11:33:46 +00:00
}
func (n *NodeImpl) WaitOnConnectedPeers() {
n.connections.Wait()
}
func (n *NodeImpl) ConnectionTracker() *sync.WaitGroup {
return &n.connections
}