diff --git a/interfaces/storage.go b/interfaces/storage.go index 31d7051..adf613f 100644 --- a/interfaces/storage.go +++ b/interfaces/storage.go @@ -2,6 +2,13 @@ package interfaces //go:generate mockgen -source=storage.go -destination=../mocks/interfaces/storage.go -package=interfaces +type StorageLocationProvider interface { + Start() error + Next() (SignedStorageLocation, error) + Upvote(uri SignedStorageLocation) error + Downvote(uri SignedStorageLocation) error +} + type StorageLocation interface { BytesURL() string OutboardBytesURL() string diff --git a/storage/storage.go b/storage/storage.go index 1eb0f0a..e0ec565 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -155,3 +155,178 @@ func (s StorageLocationMap) EncodeMsgpack(enc *msgpack.Encoder) error { func NewStorageLocationMap() StorageLocationMap { return StorageLocationMap{} } + +type StorageLocationProviderImpl struct { + node interfaces.Node + hash *encoding.Multihash + types []types.StorageLocationType + timeoutDuration time.Duration + availableNodes []*encoding.NodeId + uris map[string]interfaces.StorageLocation + timeout time.Time + isTimedOut bool + isWaitingForUri bool + mutex sync.Mutex +} + +func (s *StorageLocationProviderImpl) Start() error { + var err error + + s.uris, err = s.node.GetCachedStorageLocations(s.hash, s.types) + if err != nil { + return err + } + s.mutex.Lock() + s.availableNodes = make([]*encoding.NodeId, 0, len(s.uris)) + for k := range s.uris { + nodeId, err := encoding.DecodeNodeId(k) + if err != nil { + continue + } + + s.availableNodes = append(s.availableNodes, nodeId) + + } + + s.availableNodes, err = s.node.Services().P2P().SortNodesByScore(s.availableNodes) + if err != nil { + s.mutex.Unlock() + return err + } + + s.timeout = time.Now().Add(s.timeoutDuration) + s.isTimedOut = false + s.mutex.Unlock() + go func() { + requestSent := false + + for { + s.mutex.Lock() + if time.Now().After(s.timeout) { + s.isTimedOut = true + s.mutex.Unlock() + break + } + + newUris, err := s.node.GetCachedStorageLocations(s.hash, s.types) + if err != nil { + s.mutex.Unlock() + break + } + + if len(s.availableNodes) == 0 && len(newUris) < 2 && !requestSent { + err := s.node.Services().P2P().SendHashRequest(s.hash, s.types) + if err != nil { + s.node.Logger().Error("Error sending hash request", zap.Error(err)) + continue + } + requestSent = true + } + + hasNewNode := false + for k, v := range newUris { + if _, exists := s.uris[k]; !exists || s.uris[k] != v { + s.uris[k] = v + nodeId, err := encoding.DecodeNodeId(k) + if err != nil { + s.node.Logger().Error("Error decoding node id", zap.Error(err)) + continue + } + if !containsNode(s.availableNodes, nodeId) { + s.availableNodes = append(s.availableNodes, nodeId) + hasNewNode = true + } + } + } + + if hasNewNode { + score, err := s.node.Services().P2P().SortNodesByScore(s.availableNodes) + if err != nil { + s.node.Logger().Error("Error sorting nodes by score", zap.Error(err)) + } else { + s.availableNodes = score + } + } + s.mutex.Unlock() + + time.Sleep(10 * time.Millisecond) + } + }() + return nil +} +func (s *StorageLocationProviderImpl) Next() (interfaces.SignedStorageLocation, error) { + s.timeout = time.Now().Add(s.timeoutDuration) + + for { + if len(s.availableNodes) > 0 { + s.isWaitingForUri = false + nodeId := s.availableNodes[0] + s.availableNodes = s.availableNodes[1:] + + nodIdStr, err := nodeId.ToString() + if err != nil { + return nil, err + } + + uri, exists := s.uris[nodIdStr] + if !exists { + s.node.Logger().Error("Could not find uri for node id", zap.String("nodeId", nodIdStr)) + continue + } + + return NewSignedStorageLocation(nodeId, uri), nil + } + + s.isWaitingForUri = true + if s.isTimedOut { + hashStr, err := s.hash.ToString() + if err != nil { + return nil, err + } + return nil, fmt.Errorf("Could not download raw file: Timed out after %s %s", s.timeoutDuration.String(), hashStr) + } + + time.Sleep(10 * time.Millisecond) // Replace with a proper wait/notify mechanism if applicable + } +} + +func (s *StorageLocationProviderImpl) Upvote(uri interfaces.SignedStorageLocation) error { + err := s.node.Services().P2P().UpVote(uri.NodeId()) + if err != nil { + return err + } + + return nil +} + +func (s *StorageLocationProviderImpl) Downvote(uri interfaces.SignedStorageLocation) error { + err := s.node.Services().P2P().DownVote(uri.NodeId()) + if err != nil { + return err + } + return nil +} + +func NewStorageLocationProvider(node interfaces.Node, hash *encoding.Multihash, locationTypes []types.StorageLocationType) interfaces.StorageLocationProvider { + if locationTypes == nil { + locationTypes = []types.StorageLocationType{ + types.StorageLocationTypeFull, + } + } + + return &StorageLocationProviderImpl{ + node: node, + hash: hash, + types: locationTypes, + timeoutDuration: 60 * time.Second, + uris: make(map[string]interfaces.StorageLocation), + } +} +func containsNode(slice []*encoding.NodeId, item *encoding.NodeId) bool { + for _, v := range slice { + if bytes.Equal(v.Bytes(), item.Bytes()) { + return true + } + } + return false +}