feat: implement StorageLocationProvider
This commit is contained in:
parent
bb68bf3be1
commit
6c2ebb1152
|
@ -2,6 +2,13 @@ package interfaces
|
|||
|
||||
//go:generate mockgen -source=storage.go -destination=../mocks/interfaces/storage.go -package=interfaces
|
||||
|
||||
type StorageLocationProvider interface {
|
||||
Start() error
|
||||
Next() (SignedStorageLocation, error)
|
||||
Upvote(uri SignedStorageLocation) error
|
||||
Downvote(uri SignedStorageLocation) error
|
||||
}
|
||||
|
||||
type StorageLocation interface {
|
||||
BytesURL() string
|
||||
OutboardBytesURL() string
|
||||
|
|
|
@ -155,3 +155,178 @@ func (s StorageLocationMap) EncodeMsgpack(enc *msgpack.Encoder) error {
|
|||
func NewStorageLocationMap() StorageLocationMap {
|
||||
return StorageLocationMap{}
|
||||
}
|
||||
|
||||
type StorageLocationProviderImpl struct {
|
||||
node interfaces.Node
|
||||
hash *encoding.Multihash
|
||||
types []types.StorageLocationType
|
||||
timeoutDuration time.Duration
|
||||
availableNodes []*encoding.NodeId
|
||||
uris map[string]interfaces.StorageLocation
|
||||
timeout time.Time
|
||||
isTimedOut bool
|
||||
isWaitingForUri bool
|
||||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
func (s *StorageLocationProviderImpl) Start() error {
|
||||
var err error
|
||||
|
||||
s.uris, err = s.node.GetCachedStorageLocations(s.hash, s.types)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.mutex.Lock()
|
||||
s.availableNodes = make([]*encoding.NodeId, 0, len(s.uris))
|
||||
for k := range s.uris {
|
||||
nodeId, err := encoding.DecodeNodeId(k)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
s.availableNodes = append(s.availableNodes, nodeId)
|
||||
|
||||
}
|
||||
|
||||
s.availableNodes, err = s.node.Services().P2P().SortNodesByScore(s.availableNodes)
|
||||
if err != nil {
|
||||
s.mutex.Unlock()
|
||||
return err
|
||||
}
|
||||
|
||||
s.timeout = time.Now().Add(s.timeoutDuration)
|
||||
s.isTimedOut = false
|
||||
s.mutex.Unlock()
|
||||
go func() {
|
||||
requestSent := false
|
||||
|
||||
for {
|
||||
s.mutex.Lock()
|
||||
if time.Now().After(s.timeout) {
|
||||
s.isTimedOut = true
|
||||
s.mutex.Unlock()
|
||||
break
|
||||
}
|
||||
|
||||
newUris, err := s.node.GetCachedStorageLocations(s.hash, s.types)
|
||||
if err != nil {
|
||||
s.mutex.Unlock()
|
||||
break
|
||||
}
|
||||
|
||||
if len(s.availableNodes) == 0 && len(newUris) < 2 && !requestSent {
|
||||
err := s.node.Services().P2P().SendHashRequest(s.hash, s.types)
|
||||
if err != nil {
|
||||
s.node.Logger().Error("Error sending hash request", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
requestSent = true
|
||||
}
|
||||
|
||||
hasNewNode := false
|
||||
for k, v := range newUris {
|
||||
if _, exists := s.uris[k]; !exists || s.uris[k] != v {
|
||||
s.uris[k] = v
|
||||
nodeId, err := encoding.DecodeNodeId(k)
|
||||
if err != nil {
|
||||
s.node.Logger().Error("Error decoding node id", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
if !containsNode(s.availableNodes, nodeId) {
|
||||
s.availableNodes = append(s.availableNodes, nodeId)
|
||||
hasNewNode = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if hasNewNode {
|
||||
score, err := s.node.Services().P2P().SortNodesByScore(s.availableNodes)
|
||||
if err != nil {
|
||||
s.node.Logger().Error("Error sorting nodes by score", zap.Error(err))
|
||||
} else {
|
||||
s.availableNodes = score
|
||||
}
|
||||
}
|
||||
s.mutex.Unlock()
|
||||
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
func (s *StorageLocationProviderImpl) Next() (interfaces.SignedStorageLocation, error) {
|
||||
s.timeout = time.Now().Add(s.timeoutDuration)
|
||||
|
||||
for {
|
||||
if len(s.availableNodes) > 0 {
|
||||
s.isWaitingForUri = false
|
||||
nodeId := s.availableNodes[0]
|
||||
s.availableNodes = s.availableNodes[1:]
|
||||
|
||||
nodIdStr, err := nodeId.ToString()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
uri, exists := s.uris[nodIdStr]
|
||||
if !exists {
|
||||
s.node.Logger().Error("Could not find uri for node id", zap.String("nodeId", nodIdStr))
|
||||
continue
|
||||
}
|
||||
|
||||
return NewSignedStorageLocation(nodeId, uri), nil
|
||||
}
|
||||
|
||||
s.isWaitingForUri = true
|
||||
if s.isTimedOut {
|
||||
hashStr, err := s.hash.ToString()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, fmt.Errorf("Could not download raw file: Timed out after %s %s", s.timeoutDuration.String(), hashStr)
|
||||
}
|
||||
|
||||
time.Sleep(10 * time.Millisecond) // Replace with a proper wait/notify mechanism if applicable
|
||||
}
|
||||
}
|
||||
|
||||
func (s *StorageLocationProviderImpl) Upvote(uri interfaces.SignedStorageLocation) error {
|
||||
err := s.node.Services().P2P().UpVote(uri.NodeId())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *StorageLocationProviderImpl) Downvote(uri interfaces.SignedStorageLocation) error {
|
||||
err := s.node.Services().P2P().DownVote(uri.NodeId())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewStorageLocationProvider(node interfaces.Node, hash *encoding.Multihash, locationTypes []types.StorageLocationType) interfaces.StorageLocationProvider {
|
||||
if locationTypes == nil {
|
||||
locationTypes = []types.StorageLocationType{
|
||||
types.StorageLocationTypeFull,
|
||||
}
|
||||
}
|
||||
|
||||
return &StorageLocationProviderImpl{
|
||||
node: node,
|
||||
hash: hash,
|
||||
types: locationTypes,
|
||||
timeoutDuration: 60 * time.Second,
|
||||
uris: make(map[string]interfaces.StorageLocation),
|
||||
}
|
||||
}
|
||||
func containsNode(slice []*encoding.NodeId, item *encoding.NodeId) bool {
|
||||
for _, v := range slice {
|
||||
if bytes.Equal(v.Bytes(), item.Bytes()) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue