diff --git a/cmd/tusd/cli/composer.go b/cmd/tusd/cli/composer.go index 6709ddc..95d8a4b 100644 --- a/cmd/tusd/cli/composer.go +++ b/cmd/tusd/cli/composer.go @@ -4,6 +4,7 @@ import ( "os" "path/filepath" + "github.com/tus/tusd/pkg/filelocker" "github.com/tus/tusd/pkg/filestore" "github.com/tus/tusd/pkg/gcsstore" "github.com/tus/tusd/pkg/handler" @@ -74,7 +75,9 @@ func CreateComposer() { store := filestore.New(dir) store.UseIn(Composer) - // TODO: Add locker + + locker := filelocker.New(dir) + locker.UseIn(Composer) } stdout.Printf("Using %.2fMB as maximum size.\n", float64(Flags.MaxSize)/1024/1024) diff --git a/pkg/etcd3locker/locker.go b/pkg/etcd3locker/locker.go index 84f999d..95e2c19 100644 --- a/pkg/etcd3locker/locker.go +++ b/pkg/etcd3locker/locker.go @@ -83,8 +83,7 @@ func NewWithLockerOptions(client *etcd3.Client, opts LockerOptions) (*Etcd3Locke // UseIn adds this locker to the passed composer. func (locker *Etcd3Locker) UseIn(composer *handler.StoreComposer) { - // TODO: Add back UseIn method - //composer.UseLocker(locker) + composer.UseLocker(locker) } func (locker *Etcd3Locker) NewLock(id string) (handler.Lock, error) { diff --git a/pkg/filelocker/filelocker.go b/pkg/filelocker/filelocker.go index a1322af..7c34a21 100644 --- a/pkg/filelocker/filelocker.go +++ b/pkg/filelocker/filelocker.go @@ -35,7 +35,10 @@ func New(path string) FileLocker { return FileLocker{path} } -// TODO: Add UseIn method +// UseIn adds this locker to the passed composer. +func (locker FileLocker) UseIn(composer *handler.StoreComposer) { + composer.UseLocker(locker) +} func (locker FileLocker) NewLock(id string) (handler.Lock, error) { path, err := filepath.Abs(filepath.Join(locker.Path, id+".lock")) diff --git a/pkg/handler/composer.go b/pkg/handler/composer.go index 251bae0..abea0a6 100644 --- a/pkg/handler/composer.go +++ b/pkg/handler/composer.go @@ -9,7 +9,7 @@ type StoreComposer struct { UsesTerminater bool Terminater TerminaterDataStore UsesLocker bool - Locker LockerDataStore + Locker Locker UsesConcater bool Concater ConcaterDataStore UsesLengthDeferrer bool @@ -71,7 +71,7 @@ func (store *StoreComposer) UseTerminater(ext TerminaterDataStore) { store.Terminater = ext } -func (store *StoreComposer) UseLocker(ext LockerDataStore) { +func (store *StoreComposer) UseLocker(ext Locker) { store.UsesLocker = ext != nil store.Locker = ext } diff --git a/pkg/handler/datastore.go b/pkg/handler/datastore.go index ab6bd57..19db703 100644 --- a/pkg/handler/datastore.go +++ b/pkg/handler/datastore.go @@ -101,25 +101,6 @@ type TerminaterDataStore interface { AsTerminatableUpload(upload Upload) TerminatableUpload } -// LockerDataStore is the interface required for custom lock persisting mechanisms. -// Common ways to store this information is in memory, on disk or using an -// external service, such as ZooKeeper. -// When multiple processes are attempting to access an upload, whether it be -// by reading or writing, a synchronization mechanism is required to prevent -// data corruption, especially to ensure correct offset values and the proper -// order of chunks inside a single upload. -type LockerDataStore interface { - // LockUpload attempts to obtain an exclusive lock for the upload specified - // by its id. - // If this operation fails because the resource is already locked, the - // tusd.ErrFileLocked must be returned. If no error is returned, the attempt - // is consider to be successful and the upload to be locked until UnlockUpload - // is invoked for the same upload. - LockUpload(id string) error - // UnlockUpload releases an existing lock for the given upload. - UnlockUpload(id string) error -} - // ConcaterDataStore is the interface required to be implemented if the // Concatenation extension should be enabled. Only in this case, the handler // will parse and respect the Upload-Concat header. @@ -147,7 +128,7 @@ type LengthDeclarableUpload interface { // Locker is the interface required for custom lock persisting mechanisms. // Common ways to store this information is in memory, on disk or using an -// external service, such as ZooKeeper. +// external service, such as Redis. // When multiple processes are attempting to access an upload, whether it be // by reading or writing, a synchronization mechanism is required to prevent // data corruption, especially to ensure correct offset values and the proper diff --git a/pkg/handler/get_test.go b/pkg/handler/get_test.go index 0d486e1..67085b8 100644 --- a/pkg/handler/get_test.go +++ b/pkg/handler/get_test.go @@ -27,11 +27,13 @@ func TestGet(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - locker := NewMockLocker(ctrl) + locker := NewMockFullLocker(ctrl) + lock := NewMockFullLock(ctrl) upload := NewMockFullUpload(ctrl) gomock.InOrder( - locker.EXPECT().LockUpload("yes"), + locker.EXPECT().NewLock("yes").Return(lock, nil), + lock.EXPECT().Lock().Return(nil), store.EXPECT().GetUpload("yes").Return(upload, nil), upload.EXPECT().GetInfo().Return(FileInfo{ Offset: 5, @@ -42,7 +44,7 @@ func TestGet(t *testing.T) { }, }, nil), upload.EXPECT().GetReader().Return(reader, nil), - locker.EXPECT().UnlockUpload("yes"), + lock.EXPECT().Unlock().Return(nil), ) composer = NewStoreComposer() diff --git a/pkg/handler/handler_mock_test.go b/pkg/handler/handler_mock_test.go index 970d57a..5cde462 100644 --- a/pkg/handler/handler_mock_test.go +++ b/pkg/handler/handler_mock_test.go @@ -216,53 +216,91 @@ func (mr *MockFullUploadMockRecorder) DeclareLength(length interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeclareLength", reflect.TypeOf((*MockFullUpload)(nil).DeclareLength), length) } -// MockLocker is a mock of Locker interface -type MockLocker struct { +// MockFullLocker is a mock of FullLocker interface +type MockFullLocker struct { ctrl *gomock.Controller - recorder *MockLockerMockRecorder + recorder *MockFullLockerMockRecorder } -// MockLockerMockRecorder is the mock recorder for MockLocker -type MockLockerMockRecorder struct { - mock *MockLocker +// MockFullLockerMockRecorder is the mock recorder for MockFullLocker +type MockFullLockerMockRecorder struct { + mock *MockFullLocker } -// NewMockLocker creates a new mock instance -func NewMockLocker(ctrl *gomock.Controller) *MockLocker { - mock := &MockLocker{ctrl: ctrl} - mock.recorder = &MockLockerMockRecorder{mock} +// NewMockFullLocker creates a new mock instance +func NewMockFullLocker(ctrl *gomock.Controller) *MockFullLocker { + mock := &MockFullLocker{ctrl: ctrl} + mock.recorder = &MockFullLockerMockRecorder{mock} return mock } // EXPECT returns an object that allows the caller to indicate expected use -func (m *MockLocker) EXPECT() *MockLockerMockRecorder { +func (m *MockFullLocker) EXPECT() *MockFullLockerMockRecorder { return m.recorder } -// LockUpload mocks base method -func (m *MockLocker) LockUpload(id string) error { +// NewLock mocks base method +func (m *MockFullLocker) NewLock(id string) (handler.Lock, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "LockUpload", id) + ret := m.ctrl.Call(m, "NewLock", id) + ret0, _ := ret[0].(handler.Lock) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// NewLock indicates an expected call of NewLock +func (mr *MockFullLockerMockRecorder) NewLock(id interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewLock", reflect.TypeOf((*MockFullLocker)(nil).NewLock), id) +} + +// MockFullLock is a mock of FullLock interface +type MockFullLock struct { + ctrl *gomock.Controller + recorder *MockFullLockMockRecorder +} + +// MockFullLockMockRecorder is the mock recorder for MockFullLock +type MockFullLockMockRecorder struct { + mock *MockFullLock +} + +// NewMockFullLock creates a new mock instance +func NewMockFullLock(ctrl *gomock.Controller) *MockFullLock { + mock := &MockFullLock{ctrl: ctrl} + mock.recorder = &MockFullLockMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockFullLock) EXPECT() *MockFullLockMockRecorder { + return m.recorder +} + +// Lock mocks base method +func (m *MockFullLock) Lock() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Lock") ret0, _ := ret[0].(error) return ret0 } -// LockUpload indicates an expected call of LockUpload -func (mr *MockLockerMockRecorder) LockUpload(id interface{}) *gomock.Call { +// Lock indicates an expected call of Lock +func (mr *MockFullLockMockRecorder) Lock() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LockUpload", reflect.TypeOf((*MockLocker)(nil).LockUpload), id) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Lock", reflect.TypeOf((*MockFullLock)(nil).Lock)) } -// UnlockUpload mocks base method -func (m *MockLocker) UnlockUpload(id string) error { +// Unlock mocks base method +func (m *MockFullLock) Unlock() error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UnlockUpload", id) + ret := m.ctrl.Call(m, "Unlock") ret0, _ := ret[0].(error) return ret0 } -// UnlockUpload indicates an expected call of UnlockUpload -func (mr *MockLockerMockRecorder) UnlockUpload(id interface{}) *gomock.Call { +// Unlock indicates an expected call of Unlock +func (mr *MockFullLockMockRecorder) Unlock() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UnlockUpload", reflect.TypeOf((*MockLocker)(nil).UnlockUpload), id) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Unlock", reflect.TypeOf((*MockFullLock)(nil).Unlock)) } diff --git a/pkg/handler/head_test.go b/pkg/handler/head_test.go index a7bc3e6..7874daa 100644 --- a/pkg/handler/head_test.go +++ b/pkg/handler/head_test.go @@ -13,11 +13,13 @@ func TestHead(t *testing.T) { SubTest(t, "Status", func(t *testing.T, store *MockFullDataStore, composer *StoreComposer) { ctrl := gomock.NewController(t) defer ctrl.Finish() - locker := NewMockLocker(ctrl) + locker := NewMockFullLocker(ctrl) + lock := NewMockFullLock(ctrl) upload := NewMockFullUpload(ctrl) gomock.InOrder( - locker.EXPECT().LockUpload("yes"), + locker.EXPECT().NewLock("yes").Return(lock, nil), + lock.EXPECT().Lock().Return(nil), store.EXPECT().GetUpload("yes").Return(upload, nil), upload.EXPECT().GetInfo().Return(FileInfo{ Offset: 11, @@ -27,7 +29,7 @@ func TestHead(t *testing.T) { "type": "image/png", }, }, nil), - locker.EXPECT().UnlockUpload("yes"), + lock.EXPECT().Unlock().Return(nil), ) composer = NewStoreComposer() diff --git a/pkg/handler/patch_test.go b/pkg/handler/patch_test.go index 44843da..e841c90 100644 --- a/pkg/handler/patch_test.go +++ b/pkg/handler/patch_test.go @@ -447,11 +447,13 @@ func TestPatch(t *testing.T) { SubTest(t, "Locker", func(t *testing.T, store *MockFullDataStore, composer *StoreComposer) { ctrl := gomock.NewController(t) defer ctrl.Finish() - locker := NewMockLocker(ctrl) + locker := NewMockFullLocker(ctrl) + lock := NewMockFullLock(ctrl) upload := NewMockFullUpload(ctrl) gomock.InOrder( - locker.EXPECT().LockUpload("yes").Return(nil), + locker.EXPECT().NewLock("yes").Return(lock, nil), + lock.EXPECT().Lock().Return(nil), store.EXPECT().GetUpload("yes").Return(upload, nil), upload.EXPECT().GetInfo().Return(FileInfo{ ID: "yes", @@ -459,7 +461,7 @@ func TestPatch(t *testing.T) { Size: 20, }, nil), upload.EXPECT().WriteChunk(int64(0), NewReaderMatcher("hello")).Return(int64(5), nil), - locker.EXPECT().UnlockUpload("yes").Return(nil), + lock.EXPECT().Unlock().Return(nil), ) composer = NewStoreComposer() diff --git a/pkg/handler/post_test.go b/pkg/handler/post_test.go index 406803b..568927f 100644 --- a/pkg/handler/post_test.go +++ b/pkg/handler/post_test.go @@ -353,7 +353,8 @@ func TestPost(t *testing.T) { SubTest(t, "Create", func(t *testing.T, store *MockFullDataStore, composer *StoreComposer) { ctrl := gomock.NewController(t) defer ctrl.Finish() - locker := NewMockLocker(ctrl) + locker := NewMockFullLocker(ctrl) + lock := NewMockFullLock(ctrl) upload := NewMockFullUpload(ctrl) gomock.InOrder( @@ -372,9 +373,10 @@ func TestPost(t *testing.T) { "bar": "world", }, }, nil), - locker.EXPECT().LockUpload("foo"), + locker.EXPECT().NewLock("foo").Return(lock, nil), + lock.EXPECT().Lock().Return(nil), upload.EXPECT().WriteChunk(int64(0), NewReaderMatcher("hello")).Return(int64(5), nil), - locker.EXPECT().UnlockUpload("foo"), + lock.EXPECT().Unlock().Return(nil), ) composer = NewStoreComposer() diff --git a/pkg/handler/terminate_test.go b/pkg/handler/terminate_test.go index a9acb01..9484e94 100644 --- a/pkg/handler/terminate_test.go +++ b/pkg/handler/terminate_test.go @@ -32,11 +32,13 @@ func TestTerminate(t *testing.T) { SubTest(t, "Termination", func(t *testing.T, store *MockFullDataStore, composer *StoreComposer) { ctrl := gomock.NewController(t) defer ctrl.Finish() - locker := NewMockLocker(ctrl) + locker := NewMockFullLocker(ctrl) + lock := NewMockFullLock(ctrl) upload := NewMockFullUpload(ctrl) gomock.InOrder( - locker.EXPECT().LockUpload("foo"), + locker.EXPECT().NewLock("foo").Return(lock, nil), + lock.EXPECT().Lock().Return(nil), store.EXPECT().GetUpload("foo").Return(upload, nil), upload.EXPECT().GetInfo().Return(FileInfo{ ID: "foo", @@ -44,7 +46,7 @@ func TestTerminate(t *testing.T) { }, nil), store.EXPECT().AsTerminatableUpload(upload).Return(upload), upload.EXPECT().Terminate().Return(nil), - locker.EXPECT().UnlockUpload("foo"), + lock.EXPECT().Unlock().Return(nil), ) composer = NewStoreComposer() diff --git a/pkg/handler/unrouted_handler.go b/pkg/handler/unrouted_handler.go index 938ff2e..88c9d66 100644 --- a/pkg/handler/unrouted_handler.go +++ b/pkg/handler/unrouted_handler.go @@ -344,13 +344,13 @@ func (handler *UnroutedHandler) PostFile(w http.ResponseWriter, r *http.Request) if containsChunk { if handler.composer.UsesLocker { - locker := handler.composer.Locker - if err := locker.LockUpload(id); err != nil { + lock, err := handler.lockUpload(id) + if err != nil { handler.sendError(w, r, err) return } - defer locker.UnlockUpload(id) + defer lock.Unlock() } if err := handler.writeChunk(upload, info, w, r); err != nil { @@ -377,13 +377,13 @@ func (handler *UnroutedHandler) HeadFile(w http.ResponseWriter, r *http.Request) } if handler.composer.UsesLocker { - locker := handler.composer.Locker - if err := locker.LockUpload(id); err != nil { + lock, err := handler.lockUpload(id) + if err != nil { handler.sendError(w, r, err) return } - defer locker.UnlockUpload(id) + defer lock.Unlock() } upload, err := handler.composer.Core.GetUpload(id) @@ -453,13 +453,13 @@ func (handler *UnroutedHandler) PatchFile(w http.ResponseWriter, r *http.Request } if handler.composer.UsesLocker { - locker := handler.composer.Locker - if err := locker.LockUpload(id); err != nil { + lock, err := handler.lockUpload(id) + if err != nil { handler.sendError(w, r, err) return } - defer locker.UnlockUpload(id) + defer lock.Unlock() } upload, err := handler.composer.Core.GetUpload(id) @@ -653,13 +653,13 @@ func (handler *UnroutedHandler) GetFile(w http.ResponseWriter, r *http.Request) } if handler.composer.UsesLocker { - locker := handler.composer.Locker - if err := locker.LockUpload(id); err != nil { + lock, err := handler.lockUpload(id) + if err != nil { handler.sendError(w, r, err) return } - defer locker.UnlockUpload(id) + defer lock.Unlock() } upload, err := handler.composer.Core.GetUpload(id) @@ -778,13 +778,13 @@ func (handler *UnroutedHandler) DelFile(w http.ResponseWriter, r *http.Request) } if handler.composer.UsesLocker { - locker := handler.composer.Locker - if err := locker.LockUpload(id); err != nil { + lock, err := handler.lockUpload(id) + if err != nil { handler.sendError(w, r, err) return } - defer locker.UnlockUpload(id) + defer lock.Unlock() } upload, err := handler.composer.Core.GetUpload(id) @@ -1031,6 +1031,21 @@ func (handler *UnroutedHandler) validateNewUploadLengthHeaders(uploadLengthHeade return } +// lockUpload creates a new lock for the given upload ID and attempts to lock it. +// The created lock is returned if it was aquired successfully. +func (handler *UnroutedHandler) lockUpload(id string) (Lock, error) { + lock, err := handler.composer.Locker.NewLock(id) + if err != nil { + return nil, err + } + + if err := lock.Lock(); err != nil { + return nil, err + } + + return lock, nil +} + // ParseMetadataHeader parses the Upload-Metadata header as defined in the // File Creation extension. // e.g. Upload-Metadata: name bHVucmpzLnBuZw==,type aW1hZ2UvcG5n diff --git a/pkg/handler/utils_test.go b/pkg/handler/utils_test.go index ece9655..aa79491 100644 --- a/pkg/handler/utils_test.go +++ b/pkg/handler/utils_test.go @@ -34,8 +34,12 @@ type FullUpload interface { handler.LengthDeclarableUpload } -type Locker interface { - handler.LockerDataStore +type FullLocker interface { + handler.Locker +} + +type FullLock interface { + handler.Lock } type httpTest struct { diff --git a/pkg/memorylocker/memorylocker.go b/pkg/memorylocker/memorylocker.go index f2ac94e..932cc5a 100644 --- a/pkg/memorylocker/memorylocker.go +++ b/pkg/memorylocker/memorylocker.go @@ -33,8 +33,7 @@ func New() *MemoryLocker { // UseIn adds this locker to the passed composer. func (locker *MemoryLocker) UseIn(composer *handler.StoreComposer) { - // TOOD: Add back - //composer.UseLocker(locker) + composer.UseLocker(locker) } func (locker *MemoryLocker) NewLock(id string) (handler.Lock, error) {