From b60979e79d4fb99ad960c67f48bc2bda5c331709 Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Mon, 29 Jan 2024 01:55:36 -0500 Subject: [PATCH] refactor: further refactoring for DI, splitting node responsibilities to a new Storage service, Services, and P2P --- node/node.go | 267 +------------------------- node/services.go | 8 + protocol/base/base.go | 6 +- protocol/handshake_open.go | 8 +- protocol/hash_query.go | 29 +-- protocol/registry_entry.go | 4 +- protocol/registry_query.go | 4 +- protocol/signed/announce_peers.go | 4 +- protocol/signed/handshake_done.go | 11 +- protocol/signed/signed_message.go | 6 +- protocol/storage_location.go | 13 +- service/p2p.go | 9 + service/service.go | 1 + service/storage.go | 300 ++++++++++++++++++++++++++++++ storage/storage.go | 51 ++--- 15 files changed, 396 insertions(+), 325 deletions(-) create mode 100644 service/storage.go diff --git a/node/node.go b/node/node.go index c8d8d10..3b611df 100644 --- a/node/node.go +++ b/node/node.go @@ -1,38 +1,17 @@ package node import ( - "errors" - "fmt" "git.lumeweb.com/LumeWeb/libs5-go/config" - "git.lumeweb.com/LumeWeb/libs5-go/encoding" - "git.lumeweb.com/LumeWeb/libs5-go/metadata" "git.lumeweb.com/LumeWeb/libs5-go/protocol" "git.lumeweb.com/LumeWeb/libs5-go/protocol/signed" "git.lumeweb.com/LumeWeb/libs5-go/service" - "git.lumeweb.com/LumeWeb/libs5-go/storage" - "git.lumeweb.com/LumeWeb/libs5-go/structs" - "git.lumeweb.com/LumeWeb/libs5-go/types" - "git.lumeweb.com/LumeWeb/libs5-go/utils" - "github.com/go-resty/resty/v2" - "github.com/vmihailenco/msgpack/v5" bolt "go.etcd.io/bbolt" "go.uber.org/zap" - "time" ) -const cacheBucketName = "object-cache" - type Node struct { - nodeConfig *config.NodeConfig - metadataCache structs.Map - hashQueryRoutingTable structs.Map - services service.Services - httpClient *resty.Client - providerStore storage.ProviderStore -} - -func (n *Node) NetworkId() string { - return n.nodeConfig.P2P.Network + nodeConfig *config.NodeConfig + services service.Services } func (n *Node) Services() service.Services { @@ -41,16 +20,10 @@ func (n *Node) Services() service.Services { func NewNode(config *config.NodeConfig, services service.Services) *Node { return &Node{ - nodeConfig: config, - metadataCache: structs.NewMap(), - hashQueryRoutingTable: structs.NewMap(), - httpClient: resty.New(), - services: services, // Services are passed in, not created here + nodeConfig: config, + services: services, // Services are passed in, not created here } } -func (n *Node) HashQueryRoutingTable() structs.Map { - return n.hashQueryRoutingTable -} func (n *Node) IsStarted() bool { return n.services.IsStarted() @@ -77,12 +50,6 @@ func (n *Node) Db() *bolt.DB { func (n *Node) Start() error { protocol.RegisterProtocols() signed.RegisterSignedProtocols() - err := - utils.CreateBucket(cacheBucketName, n.Db()) - - if err != nil { - return err - } return n.services.Start() } @@ -91,236 +58,10 @@ func (n *Node) Stop() error { return n.services.Stop() } -func (n *Node) GetCachedStorageLocations(hash *encoding.Multihash, kinds []types.StorageLocationType) (map[string]storage.StorageLocation, error) { - locations := make(map[string]storage.StorageLocation) - - locationMap, err := n.readStorageLocationsFromDB(hash) - if err != nil { - return nil, err - } - if len(locationMap) == 0 { - return make(map[string]storage.StorageLocation), nil - } - - ts := time.Now().Unix() - - for _, t := range kinds { - nodeMap, ok := (locationMap)[int(t)] - if !ok { - continue - } - - for key, value := range nodeMap { - expiry, ok := value[3].(int64) - if !ok || expiry < ts { - continue - } - - addressesInterface, ok := value[1].([]interface{}) - if !ok { - continue - } - - // Create a slice to hold the strings - addresses := make([]string, len(addressesInterface)) - - // Convert each element to string - for i, v := range addressesInterface { - str, ok := v.(string) - if !ok { - // Handle the error, maybe skip this element or set a default value - continue - } - addresses[i] = str - } - - storageLocation := storage.NewStorageLocation(int(t), addresses, expiry) - - if providerMessage, ok := value[4].([]byte); ok { - (storageLocation).SetProviderMessage(providerMessage) - } - - locations[key] = storageLocation - } - } - return locations, nil -} -func (n *Node) readStorageLocationsFromDB(hash *encoding.Multihash) (storage.StorageLocationMap, error) { - var locationMap storage.StorageLocationMap - - err := n.Db().View(func(tx *bolt.Tx) error { - b := tx.Bucket([]byte(cacheBucketName)) // Replace with your actual bucket name - if b == nil { - return fmt.Errorf("bucket %s not found", cacheBucketName) - } - - bytes := b.Get(hash.FullBytes()) - if bytes == nil { - // If no data found, return an empty locationMap but no error - locationMap = storage.NewStorageLocationMap() - return nil - } - - return msgpack.Unmarshal(bytes, &locationMap) - }) - - if err != nil { - return nil, err - } - - return locationMap, nil -} - -func (n *Node) AddStorageLocation(hash *encoding.Multihash, nodeId *encoding.NodeId, location storage.StorageLocation, message []byte) error { - // Read existing storage locations - locationDb, err := n.readStorageLocationsFromDB(hash) - if err != nil { - return err - } - - nodeIdStr, err := nodeId.ToString() - if err != nil { - return err - } - - // Get or create the inner map for the specific type - innerMap, exists := locationDb[location.Type()] - if !exists { - innerMap = make(storage.NodeStorage, 1) - innerMap[nodeIdStr] = make(storage.NodeDetailsStorage, 1) - } - - // Create location map with new data - locationMap := make(map[int]interface{}, 3) - locationMap[1] = location.Parts() - locationMap[3] = location.Expiry() - locationMap[4] = message - - // Update the inner map with the new location - innerMap[nodeIdStr] = locationMap - locationDb[location.Type()] = innerMap - - // Serialize the updated map and store it in the database - packedBytes, err := msgpack.Marshal(locationDb) - if err != nil { - return err - } - err = n.Db().Update(func(tx *bolt.Tx) error { - b := tx.Bucket([]byte(cacheBucketName)) - - return b.Put(hash.FullBytes(), packedBytes) - }) - if err != nil { - return err - } - - return nil -} - -func (n *Node) DownloadBytesByHash(hash *encoding.Multihash) ([]byte, error) { - // Initialize the download URI provider - dlUriProvider := storage.NewStorageLocationProvider(n, hash, types.StorageLocationTypeFull, types.StorageLocationTypeFile) - err := dlUriProvider.Start() - if err != nil { - return nil, err - } - - retryCount := 0 - for { - dlUri, err := dlUriProvider.Next() - if err != nil { - return nil, err - } - - n.Logger().Debug("Trying to download from", zap.String("url", dlUri.Location().BytesURL())) - - res, err := n.httpClient.R().Get(dlUri.Location().BytesURL()) - if err != nil { - err := dlUriProvider.Downvote(dlUri) - if err != nil { - return nil, err - } - retryCount++ - if retryCount > 32 { - return nil, errors.New("too many retries") - } - continue - } - - bodyBytes := res.Body() - - return bodyBytes, nil - } -} - -func (n *Node) DownloadBytesByCID(cid *encoding.CID) (bytes []byte, err error) { - bytes, err = n.DownloadBytesByHash(&cid.Hash) - if err != nil { - return nil, err - } - - return bytes, nil -} - -func (n *Node) GetMetadataByCID(cid *encoding.CID) (md metadata.Metadata, err error) { - hashStr, err := cid.Hash.ToString() - if err != nil { - return nil, err - } - - if n.metadataCache.Contains(hashStr) { - md, _ := n.metadataCache.Get(hashStr) - - return md.(metadata.Metadata), nil - } - - bytes, err := n.DownloadBytesByHash(&cid.Hash) - if err != nil { - return nil, err - } - - switch cid.Type { - case types.CIDTypeMetadataMedia, types.CIDTypeBridge: // Both cases use the same deserialization method - md = metadata.NewEmptyMediaMetadata() - - err = msgpack.Unmarshal(bytes, md) - if err != nil { - return nil, err - } - case types.CIDTypeMetadataWebapp: - md = metadata.NewEmptyWebAppMetadata() - - err = msgpack.Unmarshal(bytes, md) - if err != nil { - return nil, err - } - case types.CIDTypeDirectory: - md = metadata.NewEmptyDirectoryMetadata() - - err = msgpack.Unmarshal(bytes, md) - if err != nil { - return nil, err - } - default: - return nil, errors.New("unsupported metadata format") - } - - n.metadataCache.Put(hashStr, md) - - return md, nil -} func (n *Node) WaitOnConnectedPeers() { n.services.P2P().WaitOnConnectedPeers() } -func (n *Node) SetProviderStore(store storage.ProviderStore) { - n.providerStore = store -} - -func (n *Node) ProviderStore() storage.ProviderStore { - return n.providerStore -} - func DefaultNode(config *config.NodeConfig) *Node { params := service.ServiceParams{ Logger: config.Logger, diff --git a/node/services.go b/node/services.go index e1492d1..1bb0763 100644 --- a/node/services.go +++ b/node/services.go @@ -12,12 +12,14 @@ type ServicesParams struct { P2P *service.P2PService Registry *service.RegistryService HTTP *service.HTTPService + Storage *service.StorageService } type ServicesImpl struct { p2p *service.P2PService registry *service.RegistryService http *service.HTTPService + storage *service.StorageService started bool } @@ -25,11 +27,16 @@ func (s *ServicesImpl) HTTP() *service.HTTPService { return s.http } +func (s *ServicesImpl) Storage() *service.StorageService { + return s.storage +} + func (s *ServicesImpl) All() []service.Service { services := make([]service.Service, 0) services = append(services, s.p2p) services = append(services, s.registry) services = append(services, s.http) + services = append(services, s.storage) return services } @@ -43,6 +50,7 @@ func NewServices(params ServicesParams) service.Services { p2p: params.P2P, registry: params.Registry, http: params.HTTP, + storage: params.Storage, started: false, } diff --git a/protocol/base/base.go b/protocol/base/base.go index b4f7cc8..c2b7781 100644 --- a/protocol/base/base.go +++ b/protocol/base/base.go @@ -3,8 +3,9 @@ package base import ( "context" "git.lumeweb.com/LumeWeb/libs5-go/net" - "git.lumeweb.com/LumeWeb/libs5-go/node" + "git.lumeweb.com/LumeWeb/libs5-go/service" "github.com/vmihailenco/msgpack/v5" + "go.uber.org/zap" "io" ) @@ -22,7 +23,8 @@ type IncomingMessageData struct { Original []byte Data []byte Ctx context.Context - Node *node.Node + Services service.Services + Logger *zap.Logger Peer net.Peer VerifyId bool } diff --git a/protocol/handshake_open.go b/protocol/handshake_open.go index d08b9c7..bc6d37f 100644 --- a/protocol/handshake_open.go +++ b/protocol/handshake_open.go @@ -94,21 +94,21 @@ func (h *HandshakeOpen) DecodeMessage(dec *msgpack.Decoder, message base.Incomin } func (h *HandshakeOpen) HandleMessage(message base.IncomingMessageData) error { - node := message.Node peer := message.Peer + services := message.Services - if h.networkId != node.NetworkId() { + if h.networkId != services.P2P().NetworkId() { return fmt.Errorf("Peer is in different network: %s", h.networkId) } - handshake := signed.NewHandshakeDoneRequest(h.handshake, types.SupportedFeatures, node.Services().P2P().SelfConnectionUris()) + handshake := signed.NewHandshakeDoneRequest(h.handshake, types.SupportedFeatures, services.P2P().SelfConnectionUris()) hsMessage, err := msgpack.Marshal(handshake) if err != nil { return err } - secureMessage, err := node.Services().P2P().SignMessageSimple(hsMessage) + secureMessage, err := services.P2P().SignMessageSimple(hsMessage) if err != nil { return err diff --git a/protocol/hash_query.go b/protocol/hash_query.go index edbffcd..ff7015e 100644 --- a/protocol/hash_query.go +++ b/protocol/hash_query.go @@ -88,10 +88,11 @@ func (h HashQuery) EncodeMsgpack(enc *msgpack.Encoder) error { } func (h *HashQuery) HandleMessage(message base.IncomingMessageData) error { - node := message.Node peer := message.Peer + services := message.Services + logger := message.Logger - mapLocations, err := node.GetCachedStorageLocations(h.hash, h.kinds) + mapLocations, err := services.Storage().GetCachedStorageLocations(h.hash, h.kinds) if err != nil { log.Printf("Error getting cached storage locations: %v", err) return err @@ -102,14 +103,14 @@ func (h *HashQuery) HandleMessage(message base.IncomingMessageData) error { for key := range mapLocations { nodeId, err := encoding.DecodeNodeId(key) if err != nil { - node.Logger().Error("Error decoding node id", zap.Error(err)) + logger.Error("Error decoding node id", zap.Error(err)) continue } availableNodes = append(availableNodes, nodeId) } - score, err := node.Services().P2P().SortNodesByScore(availableNodes) + score, err := services.P2P().SortNodesByScore(availableNodes) if err != nil { return err } @@ -128,16 +129,16 @@ func (h *HashQuery) HandleMessage(message base.IncomingMessageData) error { } } - if node.ProviderStore() != nil { - if node.ProviderStore().CanProvide(h.hash, h.kinds) { - location, err := node.ProviderStore().Provide(h.hash, h.kinds) + if services.Storage().ProviderStore() != nil { + if services.Storage().ProviderStore().CanProvide(h.hash, h.kinds) { + location, err := services.Storage().ProviderStore().Provide(h.hash, h.kinds) if err != nil { return err } - message := node.Services().P2P().PrepareProvideMessage(h.hash, location) + message := services.P2P().PrepareProvideMessage(h.hash, location) - err = node.AddStorageLocation(h.hash, node.Services().P2P().NodeId(), location, message) + err = services.Storage().AddStorageLocation(h.hash, services.P2P().NodeId(), location, message) if err != nil { return err } @@ -151,11 +152,11 @@ func (h *HashQuery) HandleMessage(message base.IncomingMessageData) error { var peers *hashset.Set hashString, err := h.hash.ToString() - node.Logger().Debug("HashQuery", zap.Any("hashString", hashString)) + logger.Debug("HashQuery", zap.Any("hashString", hashString)) if err != nil { return err } - peersVal, ok := node.HashQueryRoutingTable().Get(hashString) // Implement HashQueryRoutingTable method + peersVal, ok := services.P2P().HashQueryRoutingTable().Get(hashString) // Implement HashQueryRoutingTable method if ok { peers = peersVal.(*hashset.Set) if !peers.Contains(peer.Id()) { @@ -168,14 +169,14 @@ func (h *HashQuery) HandleMessage(message base.IncomingMessageData) error { peerList := hashset.New() peerList.Add(peer.Id()) - node.HashQueryRoutingTable().Put(hashString, peerList) + services.P2P().HashQueryRoutingTable().Put(hashString, peerList) - for _, val := range node.Services().P2P().Peers().Values() { + for _, val := range services.P2P().Peers().Values() { peerVal := val.(net.Peer) if !peerVal.Id().Equals(peer.Id()) { err := peerVal.SendMessage(message.Original) if err != nil { - node.Logger().Error("Failed to send message", zap.Error(err)) + logger.Error("Failed to send message", zap.Error(err)) } } } diff --git a/protocol/registry_entry.go b/protocol/registry_entry.go index 5577f6f..8e81f5c 100644 --- a/protocol/registry_entry.go +++ b/protocol/registry_entry.go @@ -51,7 +51,7 @@ func (s *RegistryEntryRequest) DecodeMessage(dec *msgpack.Decoder, message base. } func (s *RegistryEntryRequest) HandleMessage(message base.IncomingMessageData) error { - node := message.Node peer := message.Peer - return node.Services().Registry().Set(s.sre, false, peer) + services := message.Services + return services.Registry().Set(s.sre, false, peer) } diff --git a/protocol/registry_query.go b/protocol/registry_query.go index 5b975ab..e18a9a5 100644 --- a/protocol/registry_query.go +++ b/protocol/registry_query.go @@ -51,9 +51,9 @@ func (s *RegistryQuery) DecodeMessage(dec *msgpack.Decoder, message base.Incomin } func (s *RegistryQuery) HandleMessage(message base.IncomingMessageData) error { - node := message.Node + services := message.Services peer := message.Peer - sre, err := node.Services().Registry().Get(s.pk) + sre, err := services.Registry().Get(s.pk) if err != nil { return err } diff --git a/protocol/signed/announce_peers.go b/protocol/signed/announce_peers.go index f53f600..85380b6 100644 --- a/protocol/signed/announce_peers.go +++ b/protocol/signed/announce_peers.go @@ -106,10 +106,10 @@ func (a *AnnouncePeers) DecodeMessage(dec *msgpack.Decoder, message IncomingMess } func (a AnnouncePeers) HandleMessage(message IncomingMessageDataSigned) error { - node := message.Node + services := message.Services peer := message.Peer if len(a.connectionUris) > 0 { - err := node.Services().P2P().ConnectToNode([]*url.URL{a.connectionUris[0]}, false, peer) + err := services.P2P().ConnectToNode([]*url.URL{a.connectionUris[0]}, false, peer) if err != nil { return err } diff --git a/protocol/signed/handshake_done.go b/protocol/signed/handshake_done.go index 52e13fb..4c6bc70 100644 --- a/protocol/signed/handshake_done.go +++ b/protocol/signed/handshake_done.go @@ -77,12 +77,13 @@ func NewHandshakeDone() *HandshakeDone { } func (h HandshakeDone) HandleMessage(message IncomingMessageDataSigned) error { - node := message.Node + services := message.Services peer := message.Peer verifyId := message.VerifyId nodeId := message.NodeId + logger := message.Logger - if !node.IsStarted() { + if !services.IsStarted() { err := peer.End() if err != nil { return nil @@ -108,7 +109,7 @@ func (h HandshakeDone) HandleMessage(message IncomingMessageDataSigned) error { if h.supportedFeatures != types.SupportedFeatures { return fmt.Errorf("Remote node does not support required features") } - err := node.Services().P2P().AddPeer(peer) + err := services.P2P().AddPeer(peer) if err != nil { return err } @@ -121,9 +122,9 @@ func (h HandshakeDone) HandleMessage(message IncomingMessageDataSigned) error { return err } - node.Logger().Info(fmt.Sprintf("[+] %s (%s)", peerId, peer.RenderLocationURI())) + logger.Info(fmt.Sprintf("[+] %s (%s)", peerId, peer.RenderLocationURI())) - err = node.Services().P2P().SendPublicPeersToPeer(peer, []net.Peer{peer}) + err = services.P2P().SendPublicPeersToPeer(peer, []net.Peer{peer}) if err != nil { return err } diff --git a/protocol/signed/signed_message.go b/protocol/signed/signed_message.go index 9d34eba..1a8405a 100644 --- a/protocol/signed/signed_message.go +++ b/protocol/signed/signed_message.go @@ -83,8 +83,8 @@ func NewSignedMessage() *SignedMessage { func (s *SignedMessage) HandleMessage(message base.IncomingMessageData) error { var payload signedMessageReader - node := message.Node peer := message.Peer + logger := message.Logger err := msgpack.Unmarshal(s.message, &payload) if err != nil { @@ -92,9 +92,9 @@ func (s *SignedMessage) HandleMessage(message base.IncomingMessageData) error { } if msgHandler, valid := GetMessageType(payload.kind); valid { - node.Logger().Debug("SignedMessage", zap.Any("type", types.ProtocolMethodMap[types.ProtocolMethod(payload.kind)])) + logger.Debug("SignedMessage", zap.Any("type", types.ProtocolMethodMap[types.ProtocolMethod(payload.kind)])) if msgHandler.RequiresHandshake() && !peer.IsHandshakeDone() { - node.Logger().Debug("Peer is not handshake done, ignoring message", zap.Any("type", types.ProtocolMethodMap[types.ProtocolMethod(payload.kind)])) + logger.Debug("Peer is not handshake done, ignoring message", zap.Any("type", types.ProtocolMethodMap[types.ProtocolMethod(payload.kind)])) return nil } err := msgpack.Unmarshal(payload.message, &msgHandler) diff --git a/protocol/storage_location.go b/protocol/storage_location.go index c76e9fb..327a1e5 100644 --- a/protocol/storage_location.go +++ b/protocol/storage_location.go @@ -40,8 +40,9 @@ func (s *StorageLocation) DecodeMessage(dec *msgpack.Decoder, message base.Incom } func (s *StorageLocation) HandleMessage(message base.IncomingMessageData) error { msg := message.Original - node := message.Node + services := message.Services peer := message.Peer + logger := message.Logger hash := encoding.NewMultihash(msg[1:34]) // Replace NewMultihash with appropriate function @@ -80,7 +81,7 @@ func (s *StorageLocation) HandleMessage(message base.IncomingMessageData) error nodeId := encoding.NewNodeId(publicKey) // Assuming `node` is an instance of your NodeImpl structure - err := node.AddStorageLocation(hash, nodeId, storage.NewStorageLocation(int(typeOfData), parts, int64(expiry)), msg) // Implement AddStorageLocation + err := services.Storage().AddStorageLocation(hash, nodeId, storage.NewStorageLocation(int(typeOfData), parts, int64(expiry)), msg) // Implement AddStorageLocation if err != nil { return fmt.Errorf("Failed to add storage location: %s", err) @@ -92,7 +93,7 @@ func (s *StorageLocation) HandleMessage(message base.IncomingMessageData) error } var list *hashset.Set - listVal, ok := node.HashQueryRoutingTable().Get(hashStr) // Implement HashQueryRoutingTable method + listVal, ok := services.P2P().HashQueryRoutingTable().Get(hashStr) // Implement HashQueryRoutingTable method if !ok { list = hashset.New() } else { @@ -109,16 +110,16 @@ func (s *StorageLocation) HandleMessage(message base.IncomingMessageData) error if err != nil { return err } - if peerVal, ok := node.Services().P2P().Peers().Get(peerIdStr); ok { + if peerVal, ok := services.P2P().Peers().Get(peerIdStr); ok { foundPeer := peerVal.(net.Peer) err := foundPeer.SendMessage(msg) if err != nil { - node.Logger().Error("Failed to send message", zap.Error(err)) + logger.Error("Failed to send message", zap.Error(err)) continue } } - node.HashQueryRoutingTable().Remove(hash.HashCode()) + services.P2P().HashQueryRoutingTable().Remove(hash.HashCode()) } return nil diff --git a/service/p2p.go b/service/p2p.go index 4bb2949..803851f 100644 --- a/service/p2p.go +++ b/service/p2p.go @@ -50,6 +50,7 @@ type P2PService struct { outgoingPeerFailures structs.Map maxOutgoingPeerFailures uint connections sync.WaitGroup + hashQueryRoutingTable structs.Map ServiceBase } @@ -72,6 +73,7 @@ func NewP2P(params ServiceParams) *P2PService { incomingIPBlocklist: structs.NewMap(), outgoingPeerFailures: structs.NewMap(), maxOutgoingPeerFailures: params.Config.P2P.MaxOutgoingPeerFailures, + hashQueryRoutingTable: structs.NewMap(), ServiceBase: ServiceBase{ logger: params.Logger, config: params.Config, @@ -711,3 +713,10 @@ func (p *P2PService) WaitOnConnectedPeers() { func (p *P2PService) ConnectionTracker() *sync.WaitGroup { return &p.connections } + +func (p *P2PService) NetworkId() string { + return p.config.P2P.Network +} +func (n *P2PService) HashQueryRoutingTable() structs.Map { + return n.hashQueryRoutingTable +} diff --git a/service/service.go b/service/service.go index 128adce..6ce301d 100644 --- a/service/service.go +++ b/service/service.go @@ -16,6 +16,7 @@ type Services interface { P2P() *P2PService Registry() *RegistryService HTTP() *HTTPService + Storage() *StorageService All() []Service IsStarted() bool Start() error diff --git a/service/storage.go b/service/storage.go new file mode 100644 index 0000000..fa57405 --- /dev/null +++ b/service/storage.go @@ -0,0 +1,300 @@ +package service + +import ( + "errors" + "fmt" + "git.lumeweb.com/LumeWeb/libs5-go/encoding" + "git.lumeweb.com/LumeWeb/libs5-go/metadata" + "git.lumeweb.com/LumeWeb/libs5-go/storage" + "git.lumeweb.com/LumeWeb/libs5-go/structs" + "git.lumeweb.com/LumeWeb/libs5-go/types" + "git.lumeweb.com/LumeWeb/libs5-go/utils" + "github.com/go-resty/resty/v2" + "github.com/vmihailenco/msgpack/v5" + "go.etcd.io/bbolt" + "go.uber.org/zap" + "time" +) + +const cacheBucketName = "object-cache" + +var ( + _ Service = (*StorageService)(nil) +) + +type StorageService struct { + httpClient *resty.Client + metadataCache structs.Map + providerStore storage.ProviderStore + ServiceBase +} + +func NewStorage(params ServiceParams) *StorageService { + return &StorageService{ + httpClient: resty.New(), + metadataCache: structs.NewMap(), + ServiceBase: ServiceBase{ + logger: params.Logger, + config: params.Config, + db: params.Db, + }, + } +} + +func (s *StorageService) Start() error { + err := + utils.CreateBucket(cacheBucketName, s.db) + + if err != nil { + return err + } + + return nil +} + +func (s *StorageService) Stop() error { + return nil +} + +func (s *StorageService) Init() error { + return nil +} + +func (n *StorageService) SetProviderStore(store storage.ProviderStore) { + n.providerStore = store +} + +func (n *StorageService) ProviderStore() storage.ProviderStore { + return n.providerStore +} + +func (s *StorageService) GetCachedStorageLocations(hash *encoding.Multihash, kinds []types.StorageLocationType) (map[string]storage.StorageLocation, error) { + locations := make(map[string]storage.StorageLocation) + + locationMap, err := s.readStorageLocationsFromDB(hash) + if err != nil { + return nil, err + } + if len(locationMap) == 0 { + return make(map[string]storage.StorageLocation), nil + } + + ts := time.Now().Unix() + + for _, t := range kinds { + nodeMap, ok := (locationMap)[int(t)] + if !ok { + continue + } + + for key, value := range nodeMap { + expiry, ok := value[3].(int64) + if !ok || expiry < ts { + continue + } + + addressesInterface, ok := value[1].([]interface{}) + if !ok { + continue + } + + // Create a slice to hold the strings + addresses := make([]string, len(addressesInterface)) + + // Convert each element to string + for i, v := range addressesInterface { + str, ok := v.(string) + if !ok { + // Handle the error, maybe skip this element or set a default value + continue + } + addresses[i] = str + } + + storageLocation := storage.NewStorageLocation(int(t), addresses, expiry) + + if providerMessage, ok := value[4].([]byte); ok { + (storageLocation).SetProviderMessage(providerMessage) + } + + locations[key] = storageLocation + } + } + return locations, nil +} +func (s *StorageService) readStorageLocationsFromDB(hash *encoding.Multihash) (storage.StorageLocationMap, error) { + var locationMap storage.StorageLocationMap + + err := s.db.View(func(tx *bbolt.Tx) error { + b := tx.Bucket([]byte(cacheBucketName)) // Replace with your actual bucket name + if b == nil { + return fmt.Errorf("bucket %s not found", cacheBucketName) + } + + bytes := b.Get(hash.FullBytes()) + if bytes == nil { + // If no data found, return an empty locationMap but no error + locationMap = storage.NewStorageLocationMap() + return nil + } + + return msgpack.Unmarshal(bytes, &locationMap) + }) + + if err != nil { + return nil, err + } + + return locationMap, nil +} + +func (s *StorageService) AddStorageLocation(hash *encoding.Multihash, nodeId *encoding.NodeId, location storage.StorageLocation, message []byte) error { + // Read existing storage locations + locationDb, err := s.readStorageLocationsFromDB(hash) + if err != nil { + return err + } + + nodeIdStr, err := nodeId.ToString() + if err != nil { + return err + } + + // Get or create the inner map for the specific type + innerMap, exists := locationDb[location.Type()] + if !exists { + innerMap = make(storage.NodeStorage, 1) + innerMap[nodeIdStr] = make(storage.NodeDetailsStorage, 1) + } + + // Create location map with new data + locationMap := make(map[int]interface{}, 3) + locationMap[1] = location.Parts() + locationMap[3] = location.Expiry() + locationMap[4] = message + + // Update the inner map with the new location + innerMap[nodeIdStr] = locationMap + locationDb[location.Type()] = innerMap + + // Serialize the updated map and store it in the database + packedBytes, err := msgpack.Marshal(locationDb) + if err != nil { + return err + } + err = s.db.Update(func(tx *bbolt.Tx) error { + b := tx.Bucket([]byte(cacheBucketName)) + + return b.Put(hash.FullBytes(), packedBytes) + }) + if err != nil { + return err + } + + return nil +} + +func (s *StorageService) DownloadBytesByHash(hash *encoding.Multihash) ([]byte, error) { + // Initialize the download URI provider + dlUriProvider := storage.NewStorageLocationProvider(storage.StorageLocationProviderParams{ + Services: s.services, + Hash: hash, + LocationTypes: []types.StorageLocationType{ + types.StorageLocationTypeFull, + types.StorageLocationTypeFile, + }, + ServiceParams: ServiceParams{ + Logger: s.logger, + Config: s.config, + Db: s.db, + }, + }) + err := dlUriProvider.Start() + if err != nil { + return nil, err + } + + retryCount := 0 + for { + dlUri, err := dlUriProvider.Next() + if err != nil { + return nil, err + } + + s.logger.Debug("Trying to download from", zap.String("url", dlUri.Location().BytesURL())) + + res, err := s.httpClient.R().Get(dlUri.Location().BytesURL()) + if err != nil { + err := dlUriProvider.Downvote(dlUri) + if err != nil { + return nil, err + } + retryCount++ + if retryCount > 32 { + return nil, errors.New("too many retries") + } + continue + } + + bodyBytes := res.Body() + + return bodyBytes, nil + } +} + +func (s *StorageService) DownloadBytesByCID(cid *encoding.CID) (bytes []byte, err error) { + bytes, err = s.DownloadBytesByHash(&cid.Hash) + if err != nil { + return nil, err + } + + return bytes, nil +} + +func (s *StorageService) GetMetadataByCID(cid *encoding.CID) (md metadata.Metadata, err error) { + hashStr, err := cid.Hash.ToString() + if err != nil { + return nil, err + } + + if s.metadataCache.Contains(hashStr) { + md, _ := s.metadataCache.Get(hashStr) + + return md.(metadata.Metadata), nil + } + + bytes, err := s.DownloadBytesByHash(&cid.Hash) + if err != nil { + return nil, err + } + + switch cid.Type { + case types.CIDTypeMetadataMedia, types.CIDTypeBridge: // Both cases use the same deserialization method + md = metadata.NewEmptyMediaMetadata() + + err = msgpack.Unmarshal(bytes, md) + if err != nil { + return nil, err + } + case types.CIDTypeMetadataWebapp: + md = metadata.NewEmptyWebAppMetadata() + + err = msgpack.Unmarshal(bytes, md) + if err != nil { + return nil, err + } + case types.CIDTypeDirectory: + md = metadata.NewEmptyDirectoryMetadata() + + err = msgpack.Unmarshal(bytes, md) + if err != nil { + return nil, err + } + default: + return nil, errors.New("unsupported metadata format") + } + + s.metadataCache.Put(hashStr, md) + + return md, nil +} diff --git a/storage/storage.go b/storage/storage.go index a8a5eb9..32378e7 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -4,7 +4,7 @@ import ( "bytes" "fmt" "git.lumeweb.com/LumeWeb/libs5-go/encoding" - _node "git.lumeweb.com/LumeWeb/libs5-go/node" + "git.lumeweb.com/LumeWeb/libs5-go/service" "git.lumeweb.com/LumeWeb/libs5-go/types" "github.com/vmihailenco/msgpack/v5" "go.uber.org/zap" @@ -33,6 +33,13 @@ type StorageLocationImpl struct { providerMessage []byte } +type StorageLocationProviderParams struct { + Services service.Services + Hash *encoding.Multihash + LocationTypes []types.StorageLocationType + service.ServiceParams +} + func (s *StorageLocationImpl) Type() int { return s.kind } @@ -184,7 +191,7 @@ func NewStorageLocationMap() StorageLocationMap { } type StorageLocationProviderImpl struct { - node *_node.Node + services service.Services hash *encoding.Multihash types []types.StorageLocationType timeoutDuration time.Duration @@ -194,12 +201,13 @@ type StorageLocationProviderImpl struct { isTimedOut bool isWaitingForUri bool mutex sync.Mutex + logger *zap.Logger } func (s *StorageLocationProviderImpl) Start() error { var err error - s.uris, err = s.node.GetCachedStorageLocations(s.hash, s.types) + s.uris, err = s.services.Storage().GetCachedStorageLocations(s.hash, s.types) if err != nil { return err } @@ -212,10 +220,9 @@ func (s *StorageLocationProviderImpl) Start() error { } s.availableNodes = append(s.availableNodes, nodeId) - } - s.availableNodes, err = s.node.Services().P2P().SortNodesByScore(s.availableNodes) + s.availableNodes, err = s.services.P2P().SortNodesByScore(s.availableNodes) if err != nil { s.mutex.Unlock() return err @@ -235,18 +242,17 @@ func (s *StorageLocationProviderImpl) Start() error { break } - newUris, err := s.node.GetCachedStorageLocations(s.hash, s.types) + newUris, err := s.services.Storage().GetCachedStorageLocations(s.hash, s.types) if err != nil { s.mutex.Unlock() break } - //s.node.Logger().Debug("New URIs", zap.Any("uris", newUris), zap.Any("availableNodes", s.availableNodes), zap.Any("timeout", s.timeout), zap.Any("isTimedOut", s.isTimedOut), zap.Any("isWaitingForUri", s.isWaitingForUri), zap.Any("requestSent", requestSent)) if len(s.availableNodes) == 0 && len(newUris) < 2 && !requestSent { - s.node.Logger().Debug("Sending hash request") - err := s.node.Services().P2P().SendHashRequest(s.hash, s.types) + s.logger.Debug("Sending hash request") + err := s.services.P2P().SendHashRequest(s.hash, s.types) if err != nil { - s.node.Logger().Error("Error sending hash request", zap.Error(err)) + s.logger.Error("Error sending hash request", zap.Error(err)) continue } requestSent = true @@ -258,7 +264,7 @@ func (s *StorageLocationProviderImpl) Start() error { s.uris[k] = v nodeId, err := encoding.DecodeNodeId(k) if err != nil { - s.node.Logger().Error("Error decoding node id", zap.Error(err)) + s.logger.Error("Error decoding node id", zap.Error(err)) continue } if !containsNode(s.availableNodes, nodeId) { @@ -269,9 +275,9 @@ func (s *StorageLocationProviderImpl) Start() error { } if hasNewNode { - score, err := s.node.Services().P2P().SortNodesByScore(s.availableNodes) + score, err := s.services.P2P().SortNodesByScore(s.availableNodes) if err != nil { - s.node.Logger().Error("Error sorting nodes by score", zap.Error(err)) + s.logger.Error("Error sorting nodes by score", zap.Error(err)) } else { s.availableNodes = score } @@ -299,7 +305,7 @@ func (s *StorageLocationProviderImpl) Next() (SignedStorageLocation, error) { uri, exists := s.uris[nodIdStr] if !exists { - s.node.Logger().Error("Could not find uri for node id", zap.String("nodeId", nodIdStr)) + s.logger.Error("Could not find uri for node id", zap.String("nodeId", nodIdStr)) continue } @@ -320,7 +326,7 @@ func (s *StorageLocationProviderImpl) Next() (SignedStorageLocation, error) { } func (s *StorageLocationProviderImpl) Upvote(uri SignedStorageLocation) error { - err := s.node.Services().P2P().UpVote(uri.NodeId()) + err := s.services.P2P().UpVote(uri.NodeId()) if err != nil { return err } @@ -329,26 +335,27 @@ func (s *StorageLocationProviderImpl) Upvote(uri SignedStorageLocation) error { } func (s *StorageLocationProviderImpl) Downvote(uri SignedStorageLocation) error { - err := s.node.Services().P2P().DownVote(uri.NodeId()) + err := s.services.P2P().DownVote(uri.NodeId()) if err != nil { return err } return nil } -func NewStorageLocationProvider(node *_node.Node, hash *encoding.Multihash, locationTypes ...types.StorageLocationType) StorageLocationProvider { - if locationTypes == nil { - locationTypes = []types.StorageLocationType{ +func NewStorageLocationProvider(params StorageLocationProviderParams) *StorageLocationProviderImpl { + if params.LocationTypes == nil { + params.LocationTypes = []types.StorageLocationType{ types.StorageLocationTypeFull, } } return &StorageLocationProviderImpl{ - node: node, - hash: hash, - types: locationTypes, + services: params.Services, + hash: params.Hash, + types: params.LocationTypes, timeoutDuration: 60 * time.Second, uris: make(map[string]StorageLocation), + logger: params.Logger, } } func containsNode(slice []*encoding.NodeId, item *encoding.NodeId) bool {