From b2c06590b1f12f1f62d05d7e3e15645cb73b268b Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Mon, 29 Jan 2024 19:24:50 -0500 Subject: [PATCH] refactoring: more refactoring to break import cycles --- service/storage.go | 3 +- storage/provider/provider.go | 199 +++++++++++++++++++++++++++++++++++ storage/storage.go | 198 +--------------------------------- 3 files changed, 205 insertions(+), 195 deletions(-) create mode 100644 storage/provider/provider.go diff --git a/service/storage.go b/service/storage.go index fa57405..49d44c7 100644 --- a/service/storage.go +++ b/service/storage.go @@ -6,6 +6,7 @@ import ( "git.lumeweb.com/LumeWeb/libs5-go/encoding" "git.lumeweb.com/LumeWeb/libs5-go/metadata" "git.lumeweb.com/LumeWeb/libs5-go/storage" + "git.lumeweb.com/LumeWeb/libs5-go/storage/provider" "git.lumeweb.com/LumeWeb/libs5-go/structs" "git.lumeweb.com/LumeWeb/libs5-go/types" "git.lumeweb.com/LumeWeb/libs5-go/utils" @@ -196,7 +197,7 @@ func (s *StorageService) AddStorageLocation(hash *encoding.Multihash, nodeId *en func (s *StorageService) DownloadBytesByHash(hash *encoding.Multihash) ([]byte, error) { // Initialize the download URI provider - dlUriProvider := storage.NewStorageLocationProvider(storage.StorageLocationProviderParams{ + dlUriProvider := provider.NewStorageLocationProvider(provider.StorageLocationProviderParams{ Services: s.services, Hash: hash, LocationTypes: []types.StorageLocationType{ diff --git a/storage/provider/provider.go b/storage/provider/provider.go new file mode 100644 index 0000000..c39841e --- /dev/null +++ b/storage/provider/provider.go @@ -0,0 +1,199 @@ +package provider + +import ( + "bytes" + "fmt" + "git.lumeweb.com/LumeWeb/libs5-go/encoding" + "git.lumeweb.com/LumeWeb/libs5-go/service" + "git.lumeweb.com/LumeWeb/libs5-go/storage" + "git.lumeweb.com/LumeWeb/libs5-go/types" + "go.uber.org/zap" + "sync" + "time" +) + +var _ storage.StorageLocationProvider = (*StorageLocationProviderImpl)(nil) + +type StorageLocationProviderImpl struct { + services service.Services + hash *encoding.Multihash + types []types.StorageLocationType + timeoutDuration time.Duration + availableNodes []*encoding.NodeId + uris map[string]storage.StorageLocation + timeout time.Time + isTimedOut bool + isWaitingForUri bool + mutex sync.Mutex + logger *zap.Logger +} + +func (s *StorageLocationProviderImpl) Start() error { + var err error + + s.uris, err = s.services.Storage().GetCachedStorageLocations(s.hash, s.types) + if err != nil { + return err + } + s.mutex.Lock() + s.availableNodes = make([]*encoding.NodeId, 0, len(s.uris)) + for k := range s.uris { + nodeId, err := encoding.DecodeNodeId(k) + if err != nil { + continue + } + + s.availableNodes = append(s.availableNodes, nodeId) + } + + s.availableNodes, err = s.services.P2P().SortNodesByScore(s.availableNodes) + if err != nil { + s.mutex.Unlock() + return err + } + + s.timeout = time.Now().Add(s.timeoutDuration) + s.isTimedOut = false + s.mutex.Unlock() + go func() { + requestSent := false + + for { + s.mutex.Lock() + if time.Now().After(s.timeout) { + s.isTimedOut = true + s.mutex.Unlock() + break + } + + newUris, err := s.services.Storage().GetCachedStorageLocations(s.hash, s.types) + if err != nil { + s.mutex.Unlock() + break + } + + if len(s.availableNodes) == 0 && len(newUris) < 2 && !requestSent { + s.logger.Debug("Sending hash request") + err := s.services.P2P().SendHashRequest(s.hash, s.types) + if err != nil { + s.logger.Error("Error sending hash request", zap.Error(err)) + continue + } + requestSent = true + } + + hasNewNode := false + for k, v := range newUris { + if _, exists := s.uris[k]; !exists || s.uris[k] != v { + s.uris[k] = v + nodeId, err := encoding.DecodeNodeId(k) + if err != nil { + s.logger.Error("Error decoding node id", zap.Error(err)) + continue + } + if !containsNode(s.availableNodes, nodeId) { + s.availableNodes = append(s.availableNodes, nodeId) + hasNewNode = true + } + } + } + + if hasNewNode { + score, err := s.services.P2P().SortNodesByScore(s.availableNodes) + if err != nil { + s.logger.Error("Error sorting nodes by score", zap.Error(err)) + } else { + s.availableNodes = score + } + } + s.mutex.Unlock() + + time.Sleep(10 * time.Millisecond) + } + }() + return nil +} +func (s *StorageLocationProviderImpl) Next() (storage.SignedStorageLocation, error) { + s.timeout = time.Now().Add(s.timeoutDuration) + + for { + if len(s.availableNodes) > 0 { + s.isWaitingForUri = false + nodeId := s.availableNodes[0] + s.availableNodes = s.availableNodes[1:] + + nodIdStr, err := nodeId.ToString() + if err != nil { + return nil, err + } + + uri, exists := s.uris[nodIdStr] + if !exists { + s.logger.Error("Could not find uri for node id", zap.String("nodeId", nodIdStr)) + continue + } + + return storage.NewSignedStorageLocation(nodeId, uri), nil + } + + s.isWaitingForUri = true + if s.isTimedOut { + hashStr, err := s.hash.ToString() + if err != nil { + return nil, err + } + return nil, fmt.Errorf("Could not download raw file: Timed out after %s %s", s.timeoutDuration.String(), hashStr) + } + + time.Sleep(10 * time.Millisecond) // Replace with a proper wait/notify mechanism if applicable + } +} + +func (s *StorageLocationProviderImpl) Upvote(uri storage.SignedStorageLocation) error { + err := s.services.P2P().UpVote(uri.NodeId()) + if err != nil { + return err + } + + return nil +} + +func (s *StorageLocationProviderImpl) Downvote(uri storage.SignedStorageLocation) error { + err := s.services.P2P().DownVote(uri.NodeId()) + if err != nil { + return err + } + return nil +} + +func NewStorageLocationProvider(params StorageLocationProviderParams) *StorageLocationProviderImpl { + if params.LocationTypes == nil { + params.LocationTypes = []types.StorageLocationType{ + types.StorageLocationTypeFull, + } + } + + return &StorageLocationProviderImpl{ + services: params.Services, + hash: params.Hash, + types: params.LocationTypes, + timeoutDuration: 60 * time.Second, + uris: make(map[string]storage.StorageLocation), + logger: params.Logger, + } +} +func containsNode(slice []*encoding.NodeId, item *encoding.NodeId) bool { + for _, v := range slice { + if bytes.Equal(v.Bytes(), item.Bytes()) { + return true + } + } + return false +} + +type StorageLocationProviderParams struct { + Services service.Services + Hash *encoding.Multihash + LocationTypes []types.StorageLocationType + service.ServiceParams +} diff --git a/storage/storage.go b/storage/storage.go index 83b51ee..8c96454 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -1,37 +1,24 @@ package storage import ( - "bytes" "fmt" "git.lumeweb.com/LumeWeb/libs5-go/encoding" - "git.lumeweb.com/LumeWeb/libs5-go/service" - "git.lumeweb.com/LumeWeb/libs5-go/types" "github.com/vmihailenco/msgpack/v5" - "go.uber.org/zap" "strconv" - "sync" "time" ) var ( - _ msgpack.CustomDecoder = (*StorageLocationMap)(nil) - _ msgpack.CustomEncoder = (*StorageLocationMap)(nil) - _ StorageLocation = (*StorageLocationImpl)(nil) - _ StorageLocationProvider = (*StorageLocationProviderImpl)(nil) - _ SignedStorageLocation = (*SignedStorageLocationImpl)(nil) + _ msgpack.CustomDecoder = (*StorageLocationMap)(nil) + _ msgpack.CustomEncoder = (*StorageLocationMap)(nil) + _ StorageLocation = (*StorageLocationImpl)(nil) + _ SignedStorageLocation = (*SignedStorageLocationImpl)(nil) ) type StorageLocationMap map[int]NodeStorage type NodeStorage map[string]NodeDetailsStorage type NodeDetailsStorage map[int]interface{} -type StorageLocationProviderParams struct { - Services service.Services - Hash *encoding.Multihash - LocationTypes []types.StorageLocationType - service.ServiceParams -} - func (s *StorageLocationImpl) BytesURL() string { return s.parts[0] } @@ -134,183 +121,6 @@ func NewStorageLocationMap() StorageLocationMap { return StorageLocationMap{} } -type StorageLocationProviderImpl struct { - services service.Services - hash *encoding.Multihash - types []types.StorageLocationType - timeoutDuration time.Duration - availableNodes []*encoding.NodeId - uris map[string]StorageLocation - timeout time.Time - isTimedOut bool - isWaitingForUri bool - mutex sync.Mutex - logger *zap.Logger -} - -func (s *StorageLocationProviderImpl) Start() error { - var err error - - s.uris, err = s.services.Storage().GetCachedStorageLocations(s.hash, s.types) - if err != nil { - return err - } - s.mutex.Lock() - s.availableNodes = make([]*encoding.NodeId, 0, len(s.uris)) - for k := range s.uris { - nodeId, err := encoding.DecodeNodeId(k) - if err != nil { - continue - } - - s.availableNodes = append(s.availableNodes, nodeId) - } - - s.availableNodes, err = s.services.P2P().SortNodesByScore(s.availableNodes) - if err != nil { - s.mutex.Unlock() - return err - } - - s.timeout = time.Now().Add(s.timeoutDuration) - s.isTimedOut = false - s.mutex.Unlock() - go func() { - requestSent := false - - for { - s.mutex.Lock() - if time.Now().After(s.timeout) { - s.isTimedOut = true - s.mutex.Unlock() - break - } - - newUris, err := s.services.Storage().GetCachedStorageLocations(s.hash, s.types) - if err != nil { - s.mutex.Unlock() - break - } - - if len(s.availableNodes) == 0 && len(newUris) < 2 && !requestSent { - s.logger.Debug("Sending hash request") - err := s.services.P2P().SendHashRequest(s.hash, s.types) - if err != nil { - s.logger.Error("Error sending hash request", zap.Error(err)) - continue - } - requestSent = true - } - - hasNewNode := false - for k, v := range newUris { - if _, exists := s.uris[k]; !exists || s.uris[k] != v { - s.uris[k] = v - nodeId, err := encoding.DecodeNodeId(k) - if err != nil { - s.logger.Error("Error decoding node id", zap.Error(err)) - continue - } - if !containsNode(s.availableNodes, nodeId) { - s.availableNodes = append(s.availableNodes, nodeId) - hasNewNode = true - } - } - } - - if hasNewNode { - score, err := s.services.P2P().SortNodesByScore(s.availableNodes) - if err != nil { - s.logger.Error("Error sorting nodes by score", zap.Error(err)) - } else { - s.availableNodes = score - } - } - s.mutex.Unlock() - - time.Sleep(10 * time.Millisecond) - } - }() - return nil -} -func (s *StorageLocationProviderImpl) Next() (SignedStorageLocation, error) { - s.timeout = time.Now().Add(s.timeoutDuration) - - for { - if len(s.availableNodes) > 0 { - s.isWaitingForUri = false - nodeId := s.availableNodes[0] - s.availableNodes = s.availableNodes[1:] - - nodIdStr, err := nodeId.ToString() - if err != nil { - return nil, err - } - - uri, exists := s.uris[nodIdStr] - if !exists { - s.logger.Error("Could not find uri for node id", zap.String("nodeId", nodIdStr)) - continue - } - - return NewSignedStorageLocation(nodeId, uri), nil - } - - s.isWaitingForUri = true - if s.isTimedOut { - hashStr, err := s.hash.ToString() - if err != nil { - return nil, err - } - return nil, fmt.Errorf("Could not download raw file: Timed out after %s %s", s.timeoutDuration.String(), hashStr) - } - - time.Sleep(10 * time.Millisecond) // Replace with a proper wait/notify mechanism if applicable - } -} - -func (s *StorageLocationProviderImpl) Upvote(uri SignedStorageLocation) error { - err := s.services.P2P().UpVote(uri.NodeId()) - if err != nil { - return err - } - - return nil -} - -func (s *StorageLocationProviderImpl) Downvote(uri SignedStorageLocation) error { - err := s.services.P2P().DownVote(uri.NodeId()) - if err != nil { - return err - } - return nil -} - -func NewStorageLocationProvider(params StorageLocationProviderParams) *StorageLocationProviderImpl { - if params.LocationTypes == nil { - params.LocationTypes = []types.StorageLocationType{ - types.StorageLocationTypeFull, - } - } - - return &StorageLocationProviderImpl{ - services: params.Services, - hash: params.Hash, - types: params.LocationTypes, - timeoutDuration: 60 * time.Second, - uris: make(map[string]StorageLocation), - logger: params.Logger, - } -} -func containsNode(slice []*encoding.NodeId, item *encoding.NodeId) bool { - for _, v := range slice { - if bytes.Equal(v.Bytes(), item.Bytes()) { - return true - } - } - return false -} - type StorageLocationProvider interface { Start() error Next() (SignedStorageLocation, error)