core: Replace old LockerDataStore with new Locker

This commit is contained in:
Marius 2019-09-12 12:37:43 +02:00
parent 92826b171d
commit 7d639b930d
14 changed files with 135 additions and 83 deletions

View File

@ -4,6 +4,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"github.com/tus/tusd/pkg/filelocker"
"github.com/tus/tusd/pkg/filestore" "github.com/tus/tusd/pkg/filestore"
"github.com/tus/tusd/pkg/gcsstore" "github.com/tus/tusd/pkg/gcsstore"
"github.com/tus/tusd/pkg/handler" "github.com/tus/tusd/pkg/handler"
@ -74,7 +75,9 @@ func CreateComposer() {
store := filestore.New(dir) store := filestore.New(dir)
store.UseIn(Composer) store.UseIn(Composer)
// TODO: Add locker
locker := filelocker.New(dir)
locker.UseIn(Composer)
} }
stdout.Printf("Using %.2fMB as maximum size.\n", float64(Flags.MaxSize)/1024/1024) stdout.Printf("Using %.2fMB as maximum size.\n", float64(Flags.MaxSize)/1024/1024)

View File

@ -83,8 +83,7 @@ func NewWithLockerOptions(client *etcd3.Client, opts LockerOptions) (*Etcd3Locke
// UseIn adds this locker to the passed composer. // UseIn adds this locker to the passed composer.
func (locker *Etcd3Locker) UseIn(composer *handler.StoreComposer) { func (locker *Etcd3Locker) UseIn(composer *handler.StoreComposer) {
// TODO: Add back UseIn method composer.UseLocker(locker)
//composer.UseLocker(locker)
} }
func (locker *Etcd3Locker) NewLock(id string) (handler.Lock, error) { func (locker *Etcd3Locker) NewLock(id string) (handler.Lock, error) {

View File

@ -35,7 +35,10 @@ func New(path string) FileLocker {
return FileLocker{path} return FileLocker{path}
} }
// TODO: Add UseIn method // UseIn adds this locker to the passed composer.
func (locker FileLocker) UseIn(composer *handler.StoreComposer) {
composer.UseLocker(locker)
}
func (locker FileLocker) NewLock(id string) (handler.Lock, error) { func (locker FileLocker) NewLock(id string) (handler.Lock, error) {
path, err := filepath.Abs(filepath.Join(locker.Path, id+".lock")) path, err := filepath.Abs(filepath.Join(locker.Path, id+".lock"))

View File

@ -9,7 +9,7 @@ type StoreComposer struct {
UsesTerminater bool UsesTerminater bool
Terminater TerminaterDataStore Terminater TerminaterDataStore
UsesLocker bool UsesLocker bool
Locker LockerDataStore Locker Locker
UsesConcater bool UsesConcater bool
Concater ConcaterDataStore Concater ConcaterDataStore
UsesLengthDeferrer bool UsesLengthDeferrer bool
@ -71,7 +71,7 @@ func (store *StoreComposer) UseTerminater(ext TerminaterDataStore) {
store.Terminater = ext store.Terminater = ext
} }
func (store *StoreComposer) UseLocker(ext LockerDataStore) { func (store *StoreComposer) UseLocker(ext Locker) {
store.UsesLocker = ext != nil store.UsesLocker = ext != nil
store.Locker = ext store.Locker = ext
} }

View File

@ -101,25 +101,6 @@ type TerminaterDataStore interface {
AsTerminatableUpload(upload Upload) TerminatableUpload AsTerminatableUpload(upload Upload) TerminatableUpload
} }
// LockerDataStore is the interface required for custom lock persisting mechanisms.
// Common ways to store this information is in memory, on disk or using an
// external service, such as ZooKeeper.
// When multiple processes are attempting to access an upload, whether it be
// by reading or writing, a synchronization mechanism is required to prevent
// data corruption, especially to ensure correct offset values and the proper
// order of chunks inside a single upload.
type LockerDataStore interface {
// LockUpload attempts to obtain an exclusive lock for the upload specified
// by its id.
// If this operation fails because the resource is already locked, the
// tusd.ErrFileLocked must be returned. If no error is returned, the attempt
// is consider to be successful and the upload to be locked until UnlockUpload
// is invoked for the same upload.
LockUpload(id string) error
// UnlockUpload releases an existing lock for the given upload.
UnlockUpload(id string) error
}
// ConcaterDataStore is the interface required to be implemented if the // ConcaterDataStore is the interface required to be implemented if the
// Concatenation extension should be enabled. Only in this case, the handler // Concatenation extension should be enabled. Only in this case, the handler
// will parse and respect the Upload-Concat header. // will parse and respect the Upload-Concat header.
@ -147,7 +128,7 @@ type LengthDeclarableUpload interface {
// Locker is the interface required for custom lock persisting mechanisms. // Locker is the interface required for custom lock persisting mechanisms.
// Common ways to store this information is in memory, on disk or using an // Common ways to store this information is in memory, on disk or using an
// external service, such as ZooKeeper. // external service, such as Redis.
// When multiple processes are attempting to access an upload, whether it be // When multiple processes are attempting to access an upload, whether it be
// by reading or writing, a synchronization mechanism is required to prevent // by reading or writing, a synchronization mechanism is required to prevent
// data corruption, especially to ensure correct offset values and the proper // data corruption, especially to ensure correct offset values and the proper

View File

@ -27,11 +27,13 @@ func TestGet(t *testing.T) {
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
defer ctrl.Finish() defer ctrl.Finish()
locker := NewMockLocker(ctrl) locker := NewMockFullLocker(ctrl)
lock := NewMockFullLock(ctrl)
upload := NewMockFullUpload(ctrl) upload := NewMockFullUpload(ctrl)
gomock.InOrder( gomock.InOrder(
locker.EXPECT().LockUpload("yes"), locker.EXPECT().NewLock("yes").Return(lock, nil),
lock.EXPECT().Lock().Return(nil),
store.EXPECT().GetUpload("yes").Return(upload, nil), store.EXPECT().GetUpload("yes").Return(upload, nil),
upload.EXPECT().GetInfo().Return(FileInfo{ upload.EXPECT().GetInfo().Return(FileInfo{
Offset: 5, Offset: 5,
@ -42,7 +44,7 @@ func TestGet(t *testing.T) {
}, },
}, nil), }, nil),
upload.EXPECT().GetReader().Return(reader, nil), upload.EXPECT().GetReader().Return(reader, nil),
locker.EXPECT().UnlockUpload("yes"), lock.EXPECT().Unlock().Return(nil),
) )
composer = NewStoreComposer() composer = NewStoreComposer()

View File

@ -216,53 +216,91 @@ func (mr *MockFullUploadMockRecorder) DeclareLength(length interface{}) *gomock.
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeclareLength", reflect.TypeOf((*MockFullUpload)(nil).DeclareLength), length) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeclareLength", reflect.TypeOf((*MockFullUpload)(nil).DeclareLength), length)
} }
// MockLocker is a mock of Locker interface // MockFullLocker is a mock of FullLocker interface
type MockLocker struct { type MockFullLocker struct {
ctrl *gomock.Controller ctrl *gomock.Controller
recorder *MockLockerMockRecorder recorder *MockFullLockerMockRecorder
} }
// MockLockerMockRecorder is the mock recorder for MockLocker // MockFullLockerMockRecorder is the mock recorder for MockFullLocker
type MockLockerMockRecorder struct { type MockFullLockerMockRecorder struct {
mock *MockLocker mock *MockFullLocker
} }
// NewMockLocker creates a new mock instance // NewMockFullLocker creates a new mock instance
func NewMockLocker(ctrl *gomock.Controller) *MockLocker { func NewMockFullLocker(ctrl *gomock.Controller) *MockFullLocker {
mock := &MockLocker{ctrl: ctrl} mock := &MockFullLocker{ctrl: ctrl}
mock.recorder = &MockLockerMockRecorder{mock} mock.recorder = &MockFullLockerMockRecorder{mock}
return mock return mock
} }
// EXPECT returns an object that allows the caller to indicate expected use // EXPECT returns an object that allows the caller to indicate expected use
func (m *MockLocker) EXPECT() *MockLockerMockRecorder { func (m *MockFullLocker) EXPECT() *MockFullLockerMockRecorder {
return m.recorder return m.recorder
} }
// LockUpload mocks base method // NewLock mocks base method
func (m *MockLocker) LockUpload(id string) error { func (m *MockFullLocker) NewLock(id string) (handler.Lock, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "LockUpload", id) ret := m.ctrl.Call(m, "NewLock", id)
ret0, _ := ret[0].(handler.Lock)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// NewLock indicates an expected call of NewLock
func (mr *MockFullLockerMockRecorder) NewLock(id interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewLock", reflect.TypeOf((*MockFullLocker)(nil).NewLock), id)
}
// MockFullLock is a mock of FullLock interface
type MockFullLock struct {
ctrl *gomock.Controller
recorder *MockFullLockMockRecorder
}
// MockFullLockMockRecorder is the mock recorder for MockFullLock
type MockFullLockMockRecorder struct {
mock *MockFullLock
}
// NewMockFullLock creates a new mock instance
func NewMockFullLock(ctrl *gomock.Controller) *MockFullLock {
mock := &MockFullLock{ctrl: ctrl}
mock.recorder = &MockFullLockMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
func (m *MockFullLock) EXPECT() *MockFullLockMockRecorder {
return m.recorder
}
// Lock mocks base method
func (m *MockFullLock) Lock() error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Lock")
ret0, _ := ret[0].(error) ret0, _ := ret[0].(error)
return ret0 return ret0
} }
// LockUpload indicates an expected call of LockUpload // Lock indicates an expected call of Lock
func (mr *MockLockerMockRecorder) LockUpload(id interface{}) *gomock.Call { func (mr *MockFullLockMockRecorder) Lock() *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LockUpload", reflect.TypeOf((*MockLocker)(nil).LockUpload), id) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Lock", reflect.TypeOf((*MockFullLock)(nil).Lock))
} }
// UnlockUpload mocks base method // Unlock mocks base method
func (m *MockLocker) UnlockUpload(id string) error { func (m *MockFullLock) Unlock() error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "UnlockUpload", id) ret := m.ctrl.Call(m, "Unlock")
ret0, _ := ret[0].(error) ret0, _ := ret[0].(error)
return ret0 return ret0
} }
// UnlockUpload indicates an expected call of UnlockUpload // Unlock indicates an expected call of Unlock
func (mr *MockLockerMockRecorder) UnlockUpload(id interface{}) *gomock.Call { func (mr *MockFullLockMockRecorder) Unlock() *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UnlockUpload", reflect.TypeOf((*MockLocker)(nil).UnlockUpload), id) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Unlock", reflect.TypeOf((*MockFullLock)(nil).Unlock))
} }

View File

@ -13,11 +13,13 @@ func TestHead(t *testing.T) {
SubTest(t, "Status", func(t *testing.T, store *MockFullDataStore, composer *StoreComposer) { SubTest(t, "Status", func(t *testing.T, store *MockFullDataStore, composer *StoreComposer) {
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
defer ctrl.Finish() defer ctrl.Finish()
locker := NewMockLocker(ctrl) locker := NewMockFullLocker(ctrl)
lock := NewMockFullLock(ctrl)
upload := NewMockFullUpload(ctrl) upload := NewMockFullUpload(ctrl)
gomock.InOrder( gomock.InOrder(
locker.EXPECT().LockUpload("yes"), locker.EXPECT().NewLock("yes").Return(lock, nil),
lock.EXPECT().Lock().Return(nil),
store.EXPECT().GetUpload("yes").Return(upload, nil), store.EXPECT().GetUpload("yes").Return(upload, nil),
upload.EXPECT().GetInfo().Return(FileInfo{ upload.EXPECT().GetInfo().Return(FileInfo{
Offset: 11, Offset: 11,
@ -27,7 +29,7 @@ func TestHead(t *testing.T) {
"type": "image/png", "type": "image/png",
}, },
}, nil), }, nil),
locker.EXPECT().UnlockUpload("yes"), lock.EXPECT().Unlock().Return(nil),
) )
composer = NewStoreComposer() composer = NewStoreComposer()

View File

@ -447,11 +447,13 @@ func TestPatch(t *testing.T) {
SubTest(t, "Locker", func(t *testing.T, store *MockFullDataStore, composer *StoreComposer) { SubTest(t, "Locker", func(t *testing.T, store *MockFullDataStore, composer *StoreComposer) {
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
defer ctrl.Finish() defer ctrl.Finish()
locker := NewMockLocker(ctrl) locker := NewMockFullLocker(ctrl)
lock := NewMockFullLock(ctrl)
upload := NewMockFullUpload(ctrl) upload := NewMockFullUpload(ctrl)
gomock.InOrder( gomock.InOrder(
locker.EXPECT().LockUpload("yes").Return(nil), locker.EXPECT().NewLock("yes").Return(lock, nil),
lock.EXPECT().Lock().Return(nil),
store.EXPECT().GetUpload("yes").Return(upload, nil), store.EXPECT().GetUpload("yes").Return(upload, nil),
upload.EXPECT().GetInfo().Return(FileInfo{ upload.EXPECT().GetInfo().Return(FileInfo{
ID: "yes", ID: "yes",
@ -459,7 +461,7 @@ func TestPatch(t *testing.T) {
Size: 20, Size: 20,
}, nil), }, nil),
upload.EXPECT().WriteChunk(int64(0), NewReaderMatcher("hello")).Return(int64(5), nil), upload.EXPECT().WriteChunk(int64(0), NewReaderMatcher("hello")).Return(int64(5), nil),
locker.EXPECT().UnlockUpload("yes").Return(nil), lock.EXPECT().Unlock().Return(nil),
) )
composer = NewStoreComposer() composer = NewStoreComposer()

View File

@ -353,7 +353,8 @@ func TestPost(t *testing.T) {
SubTest(t, "Create", func(t *testing.T, store *MockFullDataStore, composer *StoreComposer) { SubTest(t, "Create", func(t *testing.T, store *MockFullDataStore, composer *StoreComposer) {
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
defer ctrl.Finish() defer ctrl.Finish()
locker := NewMockLocker(ctrl) locker := NewMockFullLocker(ctrl)
lock := NewMockFullLock(ctrl)
upload := NewMockFullUpload(ctrl) upload := NewMockFullUpload(ctrl)
gomock.InOrder( gomock.InOrder(
@ -372,9 +373,10 @@ func TestPost(t *testing.T) {
"bar": "world", "bar": "world",
}, },
}, nil), }, nil),
locker.EXPECT().LockUpload("foo"), locker.EXPECT().NewLock("foo").Return(lock, nil),
lock.EXPECT().Lock().Return(nil),
upload.EXPECT().WriteChunk(int64(0), NewReaderMatcher("hello")).Return(int64(5), nil), upload.EXPECT().WriteChunk(int64(0), NewReaderMatcher("hello")).Return(int64(5), nil),
locker.EXPECT().UnlockUpload("foo"), lock.EXPECT().Unlock().Return(nil),
) )
composer = NewStoreComposer() composer = NewStoreComposer()

View File

@ -32,11 +32,13 @@ func TestTerminate(t *testing.T) {
SubTest(t, "Termination", func(t *testing.T, store *MockFullDataStore, composer *StoreComposer) { SubTest(t, "Termination", func(t *testing.T, store *MockFullDataStore, composer *StoreComposer) {
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
defer ctrl.Finish() defer ctrl.Finish()
locker := NewMockLocker(ctrl) locker := NewMockFullLocker(ctrl)
lock := NewMockFullLock(ctrl)
upload := NewMockFullUpload(ctrl) upload := NewMockFullUpload(ctrl)
gomock.InOrder( gomock.InOrder(
locker.EXPECT().LockUpload("foo"), locker.EXPECT().NewLock("foo").Return(lock, nil),
lock.EXPECT().Lock().Return(nil),
store.EXPECT().GetUpload("foo").Return(upload, nil), store.EXPECT().GetUpload("foo").Return(upload, nil),
upload.EXPECT().GetInfo().Return(FileInfo{ upload.EXPECT().GetInfo().Return(FileInfo{
ID: "foo", ID: "foo",
@ -44,7 +46,7 @@ func TestTerminate(t *testing.T) {
}, nil), }, nil),
store.EXPECT().AsTerminatableUpload(upload).Return(upload), store.EXPECT().AsTerminatableUpload(upload).Return(upload),
upload.EXPECT().Terminate().Return(nil), upload.EXPECT().Terminate().Return(nil),
locker.EXPECT().UnlockUpload("foo"), lock.EXPECT().Unlock().Return(nil),
) )
composer = NewStoreComposer() composer = NewStoreComposer()

View File

@ -344,13 +344,13 @@ func (handler *UnroutedHandler) PostFile(w http.ResponseWriter, r *http.Request)
if containsChunk { if containsChunk {
if handler.composer.UsesLocker { if handler.composer.UsesLocker {
locker := handler.composer.Locker lock, err := handler.lockUpload(id)
if err := locker.LockUpload(id); err != nil { if err != nil {
handler.sendError(w, r, err) handler.sendError(w, r, err)
return return
} }
defer locker.UnlockUpload(id) defer lock.Unlock()
} }
if err := handler.writeChunk(upload, info, w, r); err != nil { if err := handler.writeChunk(upload, info, w, r); err != nil {
@ -377,13 +377,13 @@ func (handler *UnroutedHandler) HeadFile(w http.ResponseWriter, r *http.Request)
} }
if handler.composer.UsesLocker { if handler.composer.UsesLocker {
locker := handler.composer.Locker lock, err := handler.lockUpload(id)
if err := locker.LockUpload(id); err != nil { if err != nil {
handler.sendError(w, r, err) handler.sendError(w, r, err)
return return
} }
defer locker.UnlockUpload(id) defer lock.Unlock()
} }
upload, err := handler.composer.Core.GetUpload(id) upload, err := handler.composer.Core.GetUpload(id)
@ -453,13 +453,13 @@ func (handler *UnroutedHandler) PatchFile(w http.ResponseWriter, r *http.Request
} }
if handler.composer.UsesLocker { if handler.composer.UsesLocker {
locker := handler.composer.Locker lock, err := handler.lockUpload(id)
if err := locker.LockUpload(id); err != nil { if err != nil {
handler.sendError(w, r, err) handler.sendError(w, r, err)
return return
} }
defer locker.UnlockUpload(id) defer lock.Unlock()
} }
upload, err := handler.composer.Core.GetUpload(id) upload, err := handler.composer.Core.GetUpload(id)
@ -653,13 +653,13 @@ func (handler *UnroutedHandler) GetFile(w http.ResponseWriter, r *http.Request)
} }
if handler.composer.UsesLocker { if handler.composer.UsesLocker {
locker := handler.composer.Locker lock, err := handler.lockUpload(id)
if err := locker.LockUpload(id); err != nil { if err != nil {
handler.sendError(w, r, err) handler.sendError(w, r, err)
return return
} }
defer locker.UnlockUpload(id) defer lock.Unlock()
} }
upload, err := handler.composer.Core.GetUpload(id) upload, err := handler.composer.Core.GetUpload(id)
@ -778,13 +778,13 @@ func (handler *UnroutedHandler) DelFile(w http.ResponseWriter, r *http.Request)
} }
if handler.composer.UsesLocker { if handler.composer.UsesLocker {
locker := handler.composer.Locker lock, err := handler.lockUpload(id)
if err := locker.LockUpload(id); err != nil { if err != nil {
handler.sendError(w, r, err) handler.sendError(w, r, err)
return return
} }
defer locker.UnlockUpload(id) defer lock.Unlock()
} }
upload, err := handler.composer.Core.GetUpload(id) upload, err := handler.composer.Core.GetUpload(id)
@ -1031,6 +1031,21 @@ func (handler *UnroutedHandler) validateNewUploadLengthHeaders(uploadLengthHeade
return return
} }
// lockUpload creates a new lock for the given upload ID and attempts to lock it.
// The created lock is returned if it was aquired successfully.
func (handler *UnroutedHandler) lockUpload(id string) (Lock, error) {
lock, err := handler.composer.Locker.NewLock(id)
if err != nil {
return nil, err
}
if err := lock.Lock(); err != nil {
return nil, err
}
return lock, nil
}
// ParseMetadataHeader parses the Upload-Metadata header as defined in the // ParseMetadataHeader parses the Upload-Metadata header as defined in the
// File Creation extension. // File Creation extension.
// e.g. Upload-Metadata: name bHVucmpzLnBuZw==,type aW1hZ2UvcG5n // e.g. Upload-Metadata: name bHVucmpzLnBuZw==,type aW1hZ2UvcG5n

View File

@ -34,8 +34,12 @@ type FullUpload interface {
handler.LengthDeclarableUpload handler.LengthDeclarableUpload
} }
type Locker interface { type FullLocker interface {
handler.LockerDataStore handler.Locker
}
type FullLock interface {
handler.Lock
} }
type httpTest struct { type httpTest struct {

View File

@ -33,8 +33,7 @@ func New() *MemoryLocker {
// UseIn adds this locker to the passed composer. // UseIn adds this locker to the passed composer.
func (locker *MemoryLocker) UseIn(composer *handler.StoreComposer) { func (locker *MemoryLocker) UseIn(composer *handler.StoreComposer) {
// TOOD: Add back composer.UseLocker(locker)
//composer.UseLocker(locker)
} }
func (locker *MemoryLocker) NewLock(id string) (handler.Lock, error) { func (locker *MemoryLocker) NewLock(id string) (handler.Lock, error) {