From e52139f977a849d2a89ca14bdb68cafabc867953 Mon Sep 17 00:00:00 2001 From: Marius Date: Sat, 19 Mar 2022 23:21:17 +0100 Subject: [PATCH] v2: Implement cancelable lock mechanism (#667) * handler: Implement prototype of new locking back-end * memorylocker2: Switch to channel for release notification * handler: Update locker interface * handler: Add method to close body with error * memorylocker: Replace with new implementation * filelocker: Adjust methods to match interface * handler: Introduce new httpContext * handler: Implement upload interruption * handler: Adjust tests to new inferfaces * handler, memorylocker: Cancel context to avoid leaks --- go.mod | 1 + go.sum | 1 + pkg/filelocker/filelocker.go | 4 +- pkg/filelocker/filelocker_test.go | 7 +- pkg/handler/body_reader.go | 11 +- pkg/handler/concat_test.go | 39 +++-- pkg/handler/context.go | 28 ++++ pkg/handler/datastore.go | 14 +- pkg/handler/get_test.go | 21 ++- pkg/handler/handler_mock_test.go | 8 +- pkg/handler/head_test.go | 17 +-- pkg/handler/patch_test.go | 101 ++++++------ pkg/handler/post_test.go | 43 +++--- pkg/handler/terminate_test.go | 9 +- pkg/handler/unrouted_handler.go | 212 ++++++++++++++------------ pkg/memorylocker/memorylocker.go | 59 +++++-- pkg/memorylocker/memorylocker_test.go | 64 +++++++- 17 files changed, 391 insertions(+), 248 deletions(-) create mode 100644 pkg/handler/context.go diff --git a/go.mod b/go.mod index 91ef1a0..9784984 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/sethgrid/pester v0.0.0-20190127155807-68a33a018ad0 github.com/stretchr/testify v1.7.0 github.com/vimeo/go-util v1.4.1 + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect google.golang.org/api v0.70.0 google.golang.org/grpc v1.44.0 gopkg.in/Acconut/lockfile.v1 v1.1.0 diff --git a/go.sum b/go.sum index de9125f..0687ee0 100644 --- a/go.sum +++ b/go.sum @@ -457,6 +457,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/pkg/filelocker/filelocker.go b/pkg/filelocker/filelocker.go index 7c34a21..d6b8171 100644 --- a/pkg/filelocker/filelocker.go +++ b/pkg/filelocker/filelocker.go @@ -9,6 +9,7 @@ package filelocker import ( + "context" "os" "path/filepath" @@ -58,7 +59,8 @@ type fileUploadLock struct { file lockfile.Lockfile } -func (lock fileUploadLock) Lock() error { +// TODO: Implement functionality for ctx and requestRelease. +func (lock fileUploadLock) Lock(ctx context.Context, requestRelease func()) error { err := lock.file.TryLock() if err == lockfile.ErrBusy { return handler.ErrFileLocked diff --git a/pkg/filelocker/filelocker_test.go b/pkg/filelocker/filelocker_test.go index 8403a0a..ac79c37 100644 --- a/pkg/filelocker/filelocker_test.go +++ b/pkg/filelocker/filelocker_test.go @@ -1,6 +1,7 @@ package filelocker import ( + "context" "io/ioutil" "testing" @@ -21,12 +22,12 @@ func TestFileLocker(t *testing.T) { lock1, err := locker.NewLock("one") a.NoError(err) - a.NoError(lock1.Lock()) - a.Equal(handler.ErrFileLocked, lock1.Lock()) + a.NoError(lock1.Lock(context.TODO(), nil)) + a.Equal(handler.ErrFileLocked, lock1.Lock(context.TODO(), nil)) lock2, err := locker.NewLock("one") a.NoError(err) - a.Equal(handler.ErrFileLocked, lock2.Lock()) + a.Equal(handler.ErrFileLocked, lock2.Lock(context.TODO(), nil)) a.NoError(lock1.Unlock()) } diff --git a/pkg/handler/body_reader.go b/pkg/handler/body_reader.go index 7374b89..67d2f11 100644 --- a/pkg/handler/body_reader.go +++ b/pkg/handler/body_reader.go @@ -14,13 +14,15 @@ import ( // In addition, the bodyReader keeps track of how many bytes were read. type bodyReader struct { reader io.Reader + closer io.Closer err error bytesCounter int64 } -func newBodyReader(r io.Reader) *bodyReader { +func newBodyReader(r io.ReadCloser, maxSize int64) *bodyReader { return &bodyReader{ - reader: r, + reader: io.LimitReader(r, maxSize), + closer: r, } } @@ -51,3 +53,8 @@ func (r bodyReader) hasError() error { func (r *bodyReader) bytesRead() int64 { return atomic.LoadInt64(&r.bytesCounter) } + +func (r *bodyReader) closeWithError(err error) { + r.closer.Close() + r.err = err +} diff --git a/pkg/handler/concat_test.go b/pkg/handler/concat_test.go index 98bca7e..bf846b0 100644 --- a/pkg/handler/concat_test.go +++ b/pkg/handler/concat_test.go @@ -1,7 +1,6 @@ package handler_test import ( - "context" "net/http" "strings" "testing" @@ -38,14 +37,14 @@ func TestConcat(t *testing.T) { upload := NewMockFullUpload(ctrl) gomock.InOrder( - store.EXPECT().NewUpload(context.Background(), FileInfo{ + store.EXPECT().NewUpload(gomock.Any(), FileInfo{ Size: 300, IsPartial: true, IsFinal: false, PartialUploads: nil, MetaData: make(map[string]string), }).Return(upload, nil), - upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ ID: "foo", Size: 300, IsPartial: true, @@ -77,8 +76,8 @@ func TestConcat(t *testing.T) { upload := NewMockFullUpload(ctrl) gomock.InOrder( - store.EXPECT().GetUpload(context.Background(), "foo").Return(upload, nil), - upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + store.EXPECT().GetUpload(gomock.Any(), "foo").Return(upload, nil), + upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ ID: "foo", IsPartial: true, }, nil), @@ -114,26 +113,26 @@ func TestConcat(t *testing.T) { uploadC := NewMockFullUpload(ctrl) gomock.InOrder( - store.EXPECT().GetUpload(context.Background(), "a").Return(uploadA, nil), - uploadA.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + store.EXPECT().GetUpload(gomock.Any(), "a").Return(uploadA, nil), + uploadA.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ IsPartial: true, Size: 5, Offset: 5, }, nil), - store.EXPECT().GetUpload(context.Background(), "b").Return(uploadB, nil), - uploadB.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + store.EXPECT().GetUpload(gomock.Any(), "b").Return(uploadB, nil), + uploadB.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ IsPartial: true, Size: 5, Offset: 5, }, nil), - store.EXPECT().NewUpload(context.Background(), FileInfo{ + store.EXPECT().NewUpload(gomock.Any(), FileInfo{ Size: 10, IsPartial: false, IsFinal: true, PartialUploads: []string{"a", "b"}, MetaData: make(map[string]string), }).Return(uploadC, nil), - uploadC.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + uploadC.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ ID: "foo", Size: 10, IsPartial: false, @@ -142,7 +141,7 @@ func TestConcat(t *testing.T) { MetaData: make(map[string]string), }, nil), store.EXPECT().AsConcatableUpload(uploadC).Return(uploadC), - uploadC.EXPECT().ConcatUploads(context.Background(), []Upload{uploadA, uploadB}).Return(nil), + uploadC.EXPECT().ConcatUploads(gomock.Any(), []Upload{uploadA, uploadB}).Return(nil), ) handler, _ := NewHandler(Config{ @@ -188,8 +187,8 @@ func TestConcat(t *testing.T) { upload := NewMockFullUpload(ctrl) gomock.InOrder( - store.EXPECT().GetUpload(context.Background(), "foo").Return(upload, nil), - upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + store.EXPECT().GetUpload(gomock.Any(), "foo").Return(upload, nil), + upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ ID: "foo", IsFinal: true, PartialUploads: []string{"a", "b"}, @@ -226,8 +225,8 @@ func TestConcat(t *testing.T) { // This upload is still unfinished (mismatching offset and size) and // will therefore cause the POST request to fail. gomock.InOrder( - store.EXPECT().GetUpload(context.Background(), "c").Return(upload, nil), - upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + store.EXPECT().GetUpload(gomock.Any(), "c").Return(upload, nil), + upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ ID: "c", IsPartial: true, Size: 5, @@ -256,8 +255,8 @@ func TestConcat(t *testing.T) { upload := NewMockFullUpload(ctrl) gomock.InOrder( - store.EXPECT().GetUpload(context.Background(), "huge").Return(upload, nil), - upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + store.EXPECT().GetUpload(gomock.Any(), "huge").Return(upload, nil), + upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ ID: "huge", Size: 1000, Offset: 1000, @@ -286,8 +285,8 @@ func TestConcat(t *testing.T) { upload := NewMockFullUpload(ctrl) gomock.InOrder( - store.EXPECT().GetUpload(context.Background(), "foo").Return(upload, nil), - upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + store.EXPECT().GetUpload(gomock.Any(), "foo").Return(upload, nil), + upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ ID: "foo", Size: 10, Offset: 0, diff --git a/pkg/handler/context.go b/pkg/handler/context.go new file mode 100644 index 0000000..67050d5 --- /dev/null +++ b/pkg/handler/context.go @@ -0,0 +1,28 @@ +package handler + +import ( + "context" + "net/http" +) + +// httpContext is wrapper around context.Context that also carries the +// corresponding HTTP request and response writer, as well as an +// optional body reader +// TODO: Consider including HTTPResponse as well +type httpContext struct { + context.Context + + res http.ResponseWriter + req *http.Request + body *bodyReader +} + +func newContext(w http.ResponseWriter, r *http.Request) *httpContext { + return &httpContext{ + // TODO: Try to reuse the request's context in the future + Context: context.Background(), + res: w, + req: r, + body: nil, // body can be filled later for PATCH requests + } +} diff --git a/pkg/handler/datastore.go b/pkg/handler/datastore.go index 539eb4e..0f8b6e3 100644 --- a/pkg/handler/datastore.go +++ b/pkg/handler/datastore.go @@ -149,11 +149,15 @@ type Locker interface { type Lock interface { // Lock attempts to obtain an exclusive lock for the upload specified // by its id. - // If this operation fails because the resource is already locked, the - // tusd.ErrFileLocked must be returned. If no error is returned, the attempt - // is consider to be successful and the upload to be locked until UnlockUpload - // is invoked for the same upload. - Lock() error + // If the lock can be acquired, it will return without error. The requestUnlock + // callback is invoked when another caller attempts to create a lock. In this + // case, the holder of the lock should attempt to release the lock as soon + // as possible + // If the lock is already held, the holder's requestUnlock function will be + // invoked to request the lock to be released. If the context is cancelled before + // the lock can be acquired, ErrLockTimeout will be returned without acquiring + // the lock. + Lock(ctx context.Context, requestUnlock func()) error // Unlock releases an existing lock for the given upload. Unlock() error } diff --git a/pkg/handler/get_test.go b/pkg/handler/get_test.go index b1d638a..eda2d5b 100644 --- a/pkg/handler/get_test.go +++ b/pkg/handler/get_test.go @@ -1,7 +1,6 @@ package handler_test import ( - "context" "net/http" "strings" "testing" @@ -34,9 +33,9 @@ func TestGet(t *testing.T) { gomock.InOrder( locker.EXPECT().NewLock("yes").Return(lock, nil), - lock.EXPECT().Lock().Return(nil), - store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil), - upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + lock.EXPECT().Lock(gomock.Any(), gomock.Any()).Return(nil), + store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload, nil), + upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ Offset: 5, Size: 20, MetaData: map[string]string{ @@ -44,7 +43,7 @@ func TestGet(t *testing.T) { "filetype": "image/jpeg", }, }, nil), - upload.EXPECT().GetReader(context.Background()).Return(reader, nil), + upload.EXPECT().GetReader(gomock.Any()).Return(reader, nil), lock.EXPECT().Unlock().Return(nil), ) @@ -79,8 +78,8 @@ func TestGet(t *testing.T) { upload := NewMockFullUpload(ctrl) gomock.InOrder( - store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil), - upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload, nil), + upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ Offset: 0, }, nil), ) @@ -107,8 +106,8 @@ func TestGet(t *testing.T) { upload := NewMockFullUpload(ctrl) gomock.InOrder( - store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil), - upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload, nil), + upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ Offset: 0, MetaData: map[string]string{ "filetype": "non-a-valid-mime-type", @@ -139,8 +138,8 @@ func TestGet(t *testing.T) { upload := NewMockFullUpload(ctrl) gomock.InOrder( - store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil), - upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload, nil), + upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ Offset: 0, MetaData: map[string]string{ "filetype": "application/vnd.openxmlformats-officedocument.wordprocessingml.document.v1", diff --git a/pkg/handler/handler_mock_test.go b/pkg/handler/handler_mock_test.go index 062c7fa..3a49eed 100644 --- a/pkg/handler/handler_mock_test.go +++ b/pkg/handler/handler_mock_test.go @@ -293,17 +293,17 @@ func (m *MockFullLock) EXPECT() *MockFullLockMockRecorder { } // Lock mocks base method -func (m *MockFullLock) Lock() error { +func (m *MockFullLock) Lock(ctx context.Context, requestUnlock func()) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Lock") + ret := m.ctrl.Call(m, "Lock", ctx, requestUnlock) ret0, _ := ret[0].(error) return ret0 } // Lock indicates an expected call of Lock -func (mr *MockFullLockMockRecorder) Lock() *gomock.Call { +func (mr *MockFullLockMockRecorder) Lock(ctx, requestUnlock interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Lock", reflect.TypeOf((*MockFullLock)(nil).Lock)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Lock", reflect.TypeOf((*MockFullLock)(nil).Lock), ctx, requestUnlock) } // Unlock mocks base method diff --git a/pkg/handler/head_test.go b/pkg/handler/head_test.go index 353c372..cab231b 100644 --- a/pkg/handler/head_test.go +++ b/pkg/handler/head_test.go @@ -1,7 +1,6 @@ package handler_test import ( - "context" "net/http" "testing" @@ -19,9 +18,9 @@ func TestHead(t *testing.T) { gomock.InOrder( locker.EXPECT().NewLock("yes").Return(lock, nil), - lock.EXPECT().Lock().Return(nil), - store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil), - upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + lock.EXPECT().Lock(gomock.Any(), gomock.Any()).Return(nil), + store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload, nil), + upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ Offset: 11, Size: 44, MetaData: map[string]string{ @@ -64,7 +63,7 @@ func TestHead(t *testing.T) { }) SubTest(t, "UploadNotFoundFail", func(t *testing.T, store *MockFullDataStore, composer *StoreComposer) { - store.EXPECT().GetUpload(context.Background(), "no").Return(nil, ErrNotFound) + store.EXPECT().GetUpload(gomock.Any(), "no").Return(nil, ErrNotFound) handler, _ := NewHandler(Config{ StoreComposer: composer, @@ -91,8 +90,8 @@ func TestHead(t *testing.T) { upload := NewMockFullUpload(ctrl) gomock.InOrder( - store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil), - upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload, nil), + upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ SizeIsDeferred: true, Size: 0, }, nil), @@ -121,8 +120,8 @@ func TestHead(t *testing.T) { upload := NewMockFullUpload(ctrl) gomock.InOrder( - store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil), - upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload, nil), + upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ SizeIsDeferred: false, Size: 10, }, nil), diff --git a/pkg/handler/patch_test.go b/pkg/handler/patch_test.go index 7ea7ee4..3758f8a 100644 --- a/pkg/handler/patch_test.go +++ b/pkg/handler/patch_test.go @@ -1,7 +1,6 @@ package handler_test import ( - "context" "errors" "io" "io/ioutil" @@ -23,14 +22,14 @@ func TestPatch(t *testing.T) { upload := NewMockFullUpload(ctrl) gomock.InOrder( - store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil), - upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload, nil), + upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ ID: "yes", Offset: 5, Size: 10, }, nil), - upload.EXPECT().WriteChunk(context.Background(), int64(5), NewReaderMatcher("hello")).Return(int64(5), nil), - upload.EXPECT().FinishUpload(context.Background()), + upload.EXPECT().WriteChunk(gomock.Any(), int64(5), NewReaderMatcher("hello")).Return(int64(5), nil), + upload.EXPECT().FinishUpload(gomock.Any()), ) handler, _ := NewHandler(Config{ @@ -75,14 +74,14 @@ func TestPatch(t *testing.T) { upload := NewMockFullUpload(ctrl) gomock.InOrder( - store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil), - upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload, nil), + upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ ID: "yes", Offset: 5, Size: 10, }, nil), - upload.EXPECT().WriteChunk(context.Background(), int64(5), NewReaderMatcher("hello")).Return(int64(5), nil), - upload.EXPECT().FinishUpload(context.Background()), + upload.EXPECT().WriteChunk(gomock.Any(), int64(5), NewReaderMatcher("hello")).Return(int64(5), nil), + upload.EXPECT().FinishUpload(gomock.Any()), ) handler, _ := NewHandler(Config{ @@ -112,8 +111,8 @@ func TestPatch(t *testing.T) { upload := NewMockFullUpload(ctrl) gomock.InOrder( - store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil), - upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload, nil), + upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ ID: "yes", Offset: 20, Size: 20, @@ -141,7 +140,7 @@ func TestPatch(t *testing.T) { }) SubTest(t, "UploadNotFoundFail", func(t *testing.T, store *MockFullDataStore, composer *StoreComposer) { - store.EXPECT().GetUpload(context.Background(), "no").Return(nil, ErrNotFound) + store.EXPECT().GetUpload(gomock.Any(), "no").Return(nil, ErrNotFound) handler, _ := NewHandler(Config{ StoreComposer: composer, @@ -165,8 +164,8 @@ func TestPatch(t *testing.T) { upload := NewMockFullUpload(ctrl) gomock.InOrder( - store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil), - upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload, nil), + upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ ID: "yes", Offset: 5, }, nil), @@ -194,8 +193,8 @@ func TestPatch(t *testing.T) { upload := NewMockFullUpload(ctrl) gomock.InOrder( - store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil), - upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload, nil), + upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ ID: "yes", Offset: 5, Size: 10, @@ -268,14 +267,14 @@ func TestPatch(t *testing.T) { upload := NewMockFullUpload(ctrl) gomock.InOrder( - store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil), - upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload, nil), + upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ ID: "yes", Offset: 5, Size: 20, }, nil), - upload.EXPECT().WriteChunk(context.Background(), int64(5), NewReaderMatcher("hellothisismore")).Return(int64(15), nil), - upload.EXPECT().FinishUpload(context.Background()), + upload.EXPECT().WriteChunk(gomock.Any(), int64(5), NewReaderMatcher("hellothisismore")).Return(int64(15), nil), + upload.EXPECT().FinishUpload(gomock.Any()), ) handler, _ := NewHandler(Config{ @@ -310,17 +309,17 @@ func TestPatch(t *testing.T) { upload := NewMockFullUpload(ctrl) gomock.InOrder( - store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil), - upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload, nil), + upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ ID: "yes", Offset: 5, Size: 0, SizeIsDeferred: true, }, nil), store.EXPECT().AsLengthDeclarableUpload(upload).Return(upload), - upload.EXPECT().DeclareLength(context.Background(), int64(20)), - upload.EXPECT().WriteChunk(context.Background(), int64(5), NewReaderMatcher("hellothisismore")).Return(int64(15), nil), - upload.EXPECT().FinishUpload(context.Background()), + upload.EXPECT().DeclareLength(gomock.Any(), int64(20)), + upload.EXPECT().WriteChunk(gomock.Any(), int64(5), NewReaderMatcher("hellothisismore")).Return(int64(15), nil), + upload.EXPECT().FinishUpload(gomock.Any()), ) handler, _ := NewHandler(Config{ @@ -353,16 +352,16 @@ func TestPatch(t *testing.T) { upload := NewMockFullUpload(ctrl) gomock.InOrder( - store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil), - upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload, nil), + upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ ID: "yes", Offset: 20, Size: 0, SizeIsDeferred: true, }, nil), store.EXPECT().AsLengthDeclarableUpload(upload).Return(upload), - upload.EXPECT().DeclareLength(context.Background(), int64(20)), - upload.EXPECT().FinishUpload(context.Background()), + upload.EXPECT().DeclareLength(gomock.Any(), int64(20)), + upload.EXPECT().FinishUpload(gomock.Any()), ) handler, _ := NewHandler(Config{ @@ -392,26 +391,26 @@ func TestPatch(t *testing.T) { upload2 := NewMockFullUpload(ctrl) gomock.InOrder( - store.EXPECT().GetUpload(context.Background(), "yes").Return(upload1, nil), - upload1.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload1, nil), + upload1.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ ID: "yes", Offset: 5, Size: 0, SizeIsDeferred: true, }, nil), store.EXPECT().AsLengthDeclarableUpload(upload1).Return(upload1), - upload1.EXPECT().DeclareLength(context.Background(), int64(20)), - upload1.EXPECT().WriteChunk(context.Background(), int64(5), NewReaderMatcher("hello")).Return(int64(5), nil), + upload1.EXPECT().DeclareLength(gomock.Any(), int64(20)), + upload1.EXPECT().WriteChunk(gomock.Any(), int64(5), NewReaderMatcher("hello")).Return(int64(5), nil), - store.EXPECT().GetUpload(context.Background(), "yes").Return(upload2, nil), - upload2.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload2, nil), + upload2.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ ID: "yes", Offset: 10, Size: 20, SizeIsDeferred: false, }, nil), - upload2.EXPECT().WriteChunk(context.Background(), int64(10), NewReaderMatcher("thisismore")).Return(int64(10), nil), - upload2.EXPECT().FinishUpload(context.Background()), + upload2.EXPECT().WriteChunk(gomock.Any(), int64(10), NewReaderMatcher("thisismore")).Return(int64(10), nil), + upload2.EXPECT().FinishUpload(gomock.Any()), ) handler, _ := NewHandler(Config{ @@ -460,14 +459,14 @@ func TestPatch(t *testing.T) { gomock.InOrder( locker.EXPECT().NewLock("yes").Return(lock, nil), - lock.EXPECT().Lock().Return(nil), - store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil), - upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + lock.EXPECT().Lock(gomock.Any(), gomock.Any()).Return(nil), + store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload, nil), + upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ ID: "yes", Offset: 0, Size: 20, }, nil), - upload.EXPECT().WriteChunk(context.Background(), int64(0), NewReaderMatcher("hello")).Return(int64(5), nil), + upload.EXPECT().WriteChunk(gomock.Any(), int64(0), NewReaderMatcher("hello")).Return(int64(5), nil), lock.EXPECT().Unlock().Return(nil), ) @@ -500,13 +499,13 @@ func TestPatch(t *testing.T) { // We simulate that the upload has already an offset of 10 bytes. Therefore, the progress notifications // must be the sum of the exisiting offset and the newly read bytes. gomock.InOrder( - store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil), - upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload, nil), + upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ ID: "yes", Offset: 10, Size: 100, }, nil), - upload.EXPECT().WriteChunk(context.Background(), int64(10), NewReaderMatcher("first second third")).Return(int64(18), nil), + upload.EXPECT().WriteChunk(gomock.Any(), int64(10), NewReaderMatcher("first second third")).Return(int64(18), nil), ) handler, _ := NewHandler(Config{ @@ -574,15 +573,15 @@ func TestPatch(t *testing.T) { upload := NewMockFullUpload(ctrl) gomock.InOrder( - store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil), - upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload, nil), + upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ ID: "yes", Offset: 0, Size: 100, }, nil), - upload.EXPECT().WriteChunk(context.Background(), int64(0), NewReaderMatcher("first ")).Return(int64(6), nil), + upload.EXPECT().WriteChunk(gomock.Any(), int64(0), NewReaderMatcher("first ")).Return(int64(6), nil), store.EXPECT().AsTerminatableUpload(upload).Return(upload), - upload.EXPECT().Terminate(context.Background()), + upload.EXPECT().Terminate(gomock.Any()), ) handler, _ := NewHandler(Config{ @@ -644,14 +643,14 @@ func TestPatch(t *testing.T) { upload := NewMockFullUpload(ctrl) gomock.InOrder( - store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil), - upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload, nil), + upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ ID: "yes", Offset: 0, Size: 100, }, nil), // The reader for WriteChunk must not return an error. - upload.EXPECT().WriteChunk(context.Background(), int64(0), NewReaderMatcher("first ")).Return(int64(6), nil), + upload.EXPECT().WriteChunk(gomock.Any(), int64(0), NewReaderMatcher("first ")).Return(int64(6), nil), ) handler, _ := NewHandler(Config{ diff --git a/pkg/handler/post_test.go b/pkg/handler/post_test.go index e18baa2..fb4328d 100644 --- a/pkg/handler/post_test.go +++ b/pkg/handler/post_test.go @@ -2,7 +2,6 @@ package handler_test import ( "bytes" - "context" "net/http" "strings" "testing" @@ -20,7 +19,7 @@ func TestPost(t *testing.T) { upload := NewMockFullUpload(ctrl) gomock.InOrder( - store.EXPECT().NewUpload(context.Background(), FileInfo{ + store.EXPECT().NewUpload(gomock.Any(), FileInfo{ Size: 300, MetaData: map[string]string{ "foo": "hello", @@ -28,7 +27,7 @@ func TestPost(t *testing.T) { "empty": "", }, }).Return(upload, nil), - upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ ID: "foo", Size: 300, MetaData: map[string]string{ @@ -76,16 +75,16 @@ func TestPost(t *testing.T) { upload := NewMockFullUpload(ctrl) gomock.InOrder( - store.EXPECT().NewUpload(context.Background(), FileInfo{ + store.EXPECT().NewUpload(gomock.Any(), FileInfo{ Size: 0, MetaData: map[string]string{}, }).Return(upload, nil), - upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ ID: "foo", Size: 0, MetaData: map[string]string{}, }, nil), - upload.EXPECT().FinishUpload(context.Background()).Return(nil), + upload.EXPECT().FinishUpload(gomock.Any()).Return(nil), ) handler, _ := NewHandler(Config{ @@ -211,11 +210,11 @@ func TestPost(t *testing.T) { upload := NewMockFullUpload(ctrl) gomock.InOrder( - store.EXPECT().NewUpload(context.Background(), FileInfo{ + store.EXPECT().NewUpload(gomock.Any(), FileInfo{ Size: 300, MetaData: map[string]string{}, }).Return(upload, nil), - upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ ID: "foo", Size: 300, MetaData: map[string]string{}, @@ -248,11 +247,11 @@ func TestPost(t *testing.T) { upload := NewMockFullUpload(ctrl) gomock.InOrder( - store.EXPECT().NewUpload(context.Background(), FileInfo{ + store.EXPECT().NewUpload(gomock.Any(), FileInfo{ Size: 300, MetaData: map[string]string{}, }).Return(upload, nil), - upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ ID: "foo", Size: 300, MetaData: map[string]string{}, @@ -286,11 +285,11 @@ func TestPost(t *testing.T) { upload := NewMockFullUpload(ctrl) gomock.InOrder( - store.EXPECT().NewUpload(context.Background(), FileInfo{ + store.EXPECT().NewUpload(gomock.Any(), FileInfo{ Size: 300, MetaData: map[string]string{}, }).Return(upload, nil), - upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ ID: "foo", Size: 300, MetaData: map[string]string{}, @@ -325,11 +324,11 @@ func TestPost(t *testing.T) { upload := NewMockFullUpload(ctrl) gomock.InOrder( - store.EXPECT().NewUpload(context.Background(), FileInfo{ + store.EXPECT().NewUpload(gomock.Any(), FileInfo{ Size: 300, MetaData: map[string]string{}, }).Return(upload, nil), - upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ ID: "foo", Size: 300, MetaData: map[string]string{}, @@ -367,14 +366,14 @@ func TestPost(t *testing.T) { upload := NewMockFullUpload(ctrl) gomock.InOrder( - store.EXPECT().NewUpload(context.Background(), FileInfo{ + store.EXPECT().NewUpload(gomock.Any(), FileInfo{ Size: 300, MetaData: map[string]string{ "foo": "hello", "bar": "world", }, }).Return(upload, nil), - upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ ID: "foo", Size: 300, MetaData: map[string]string{ @@ -383,8 +382,8 @@ func TestPost(t *testing.T) { }, }, nil), locker.EXPECT().NewLock("foo").Return(lock, nil), - lock.EXPECT().Lock().Return(nil), - upload.EXPECT().WriteChunk(context.Background(), int64(0), NewReaderMatcher("hello")).Return(int64(5), nil), + lock.EXPECT().Lock(gomock.Any(), gomock.Any()).Return(nil), + upload.EXPECT().WriteChunk(gomock.Any(), int64(0), NewReaderMatcher("hello")).Return(int64(5), nil), lock.EXPECT().Unlock().Return(nil), ) @@ -420,11 +419,11 @@ func TestPost(t *testing.T) { upload := NewMockFullUpload(ctrl) gomock.InOrder( - store.EXPECT().NewUpload(context.Background(), FileInfo{ + store.EXPECT().NewUpload(gomock.Any(), FileInfo{ Size: 300, MetaData: map[string]string{}, }).Return(upload, nil), - upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ ID: "foo", Size: 300, MetaData: map[string]string{}, @@ -454,11 +453,11 @@ func TestPost(t *testing.T) { upload := NewMockFullUpload(ctrl) gomock.InOrder( - store.EXPECT().NewUpload(context.Background(), FileInfo{ + store.EXPECT().NewUpload(gomock.Any(), FileInfo{ Size: 300, MetaData: map[string]string{}, }).Return(upload, nil), - upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ ID: "foo", Size: 300, MetaData: map[string]string{}, diff --git a/pkg/handler/terminate_test.go b/pkg/handler/terminate_test.go index 9af30a8..b5121f5 100644 --- a/pkg/handler/terminate_test.go +++ b/pkg/handler/terminate_test.go @@ -1,7 +1,6 @@ package handler_test import ( - "context" "net/http" "testing" @@ -39,14 +38,14 @@ func TestTerminate(t *testing.T) { gomock.InOrder( locker.EXPECT().NewLock("foo").Return(lock, nil), - lock.EXPECT().Lock().Return(nil), - store.EXPECT().GetUpload(context.Background(), "foo").Return(upload, nil), - upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + lock.EXPECT().Lock(gomock.Any(), gomock.Any()).Return(nil), + store.EXPECT().GetUpload(gomock.Any(), "foo").Return(upload, nil), + upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ ID: "foo", Size: 10, }, nil), store.EXPECT().AsTerminatableUpload(upload).Return(upload), - upload.EXPECT().Terminate(context.Background()).Return(nil), + upload.EXPECT().Terminate(gomock.Any()).Return(nil), lock.EXPECT().Unlock().Return(nil), ) diff --git a/pkg/handler/unrouted_handler.go b/pkg/handler/unrouted_handler.go index 0975a05..3433e70 100644 --- a/pkg/handler/unrouted_handler.go +++ b/pkg/handler/unrouted_handler.go @@ -31,6 +31,7 @@ var ( ErrInvalidOffset = NewError("ERR_INVALID_OFFSET", "missing or invalid Upload-Offset header", http.StatusBadRequest) ErrNotFound = NewError("ERR_UPLOAD_NOT_FOUND", "upload not found", http.StatusNotFound) ErrFileLocked = NewError("ERR_UPLOAD_LOCKED", "file currently locked", http.StatusLocked) + ErrLockTimeout = NewError("ERR_LOCK_TIMEOUT", "failed to acquire lock before timeout", http.StatusInternalServerError) ErrMismatchOffset = NewError("ERR_MISMATCHED_OFFSET", "mismatched offset", http.StatusConflict) ErrSizeExceeded = NewError("ERR_UPLOAD_SIZE_EXCEEDED", "upload's size exceeded", http.StatusRequestEntityTooLarge) ErrNotImplemented = NewError("ERR_NOT_IMPLEMENTED", "feature not implemented", http.StatusNotImplemented) @@ -41,6 +42,7 @@ var ( ErrInvalidUploadDeferLength = NewError("ERR_INVALID_UPLOAD_LENGTH_DEFER", "invalid Upload-Defer-Length header", http.StatusBadRequest) ErrUploadStoppedByServer = NewError("ERR_UPLOAD_STOPPED", "upload has been stopped by server", http.StatusBadRequest) ErrUploadRejectedByServer = NewError("ERR_UPLOAD_REJECTED", "upload creation has been rejected by server", http.StatusBadRequest) + ErrUploadInterrupted = NewError("ERR_UPLAOD_INTERRUPTED", "upload has been interrupted by another request for this upload resource", http.StatusBadRequest) // TODO: These two responses are 500 for backwards compatability. We should discuss // whether it is better to more them to 4XX status codes. @@ -194,7 +196,8 @@ func (handler *UnroutedHandler) Middleware(h http.Handler) http.Handler { // will be ignored or interpreted as a rejection. // For example, the Presto engine, which is used in older versions of // Opera, Opera Mobile and Opera Mini, handles CORS this way. - handler.sendResp(w, r, HTTPResponse{ + c := newContext(w, r) + handler.sendResp(c, HTTPResponse{ StatusCode: http.StatusOK, }) return @@ -204,7 +207,8 @@ func (handler *UnroutedHandler) Middleware(h http.Handler) http.Handler { // GET and HEAD methods are not checked since a browser may visit this URL and does // not include this header. GET requests are not part of the specification. if r.Method != "GET" && r.Method != "HEAD" && r.Header.Get("Tus-Resumable") != "1.0.0" { - handler.sendError(w, r, ErrUnsupportedVersion) + c := newContext(w, r) + handler.sendError(c, ErrUnsupportedVersion) return } @@ -216,7 +220,7 @@ func (handler *UnroutedHandler) Middleware(h http.Handler) http.Handler { // PostFile creates a new file upload using the datastore after validating the // length and parsing the metadata. func (handler *UnroutedHandler) PostFile(w http.ResponseWriter, r *http.Request) { - ctx := context.Background() + c := newContext(w, r) // Check for presence of application/offset+octet-stream. If another content // type is defined, it will be ignored and treated as none was set because @@ -233,7 +237,7 @@ func (handler *UnroutedHandler) PostFile(w http.ResponseWriter, r *http.Request) // Parse Upload-Concat header isPartial, isFinal, partialUploadIDs, err := parseConcat(concatHeader) if err != nil { - handler.sendError(w, r, err) + handler.sendError(c, err) return } @@ -246,13 +250,13 @@ func (handler *UnroutedHandler) PostFile(w http.ResponseWriter, r *http.Request) if isFinal { // A final upload must not contain a chunk within the creation request if containsChunk { - handler.sendError(w, r, ErrModifyFinal) + handler.sendError(c, ErrModifyFinal) return } - partialUploads, size, err = handler.sizeOfUploads(ctx, partialUploadIDs) + partialUploads, size, err = handler.sizeOfUploads(c, partialUploadIDs) if err != nil { - handler.sendError(w, r, err) + handler.sendError(c, err) return } } else { @@ -260,14 +264,14 @@ func (handler *UnroutedHandler) PostFile(w http.ResponseWriter, r *http.Request) uploadDeferLengthHeader := r.Header.Get("Upload-Defer-Length") size, sizeIsDeferred, err = handler.validateNewUploadLengthHeaders(uploadLengthHeader, uploadDeferLengthHeader) if err != nil { - handler.sendError(w, r, err) + handler.sendError(c, err) return } } // Test whether the size is still allowed if handler.config.MaxSize > 0 && size > handler.config.MaxSize { - handler.sendError(w, r, ErrMaxSizeExceeded) + handler.sendError(c, ErrMaxSizeExceeded) return } @@ -291,21 +295,21 @@ func (handler *UnroutedHandler) PostFile(w http.ResponseWriter, r *http.Request) if handler.config.PreUploadCreateCallback != nil { resp2, err := handler.config.PreUploadCreateCallback(newHookEvent(info, r)) if err != nil { - handler.sendError(w, r, err) + handler.sendError(c, err) return } resp = resp.MergeWith(resp2) } - upload, err := handler.composer.Core.NewUpload(ctx, info) + upload, err := handler.composer.Core.NewUpload(c, info) if err != nil { - handler.sendError(w, r, err) + handler.sendError(c, err) return } - info, err = upload.GetInfo(ctx) + info, err = upload.GetInfo(c) if err != nil { - handler.sendError(w, r, err) + handler.sendError(c, err) return } @@ -325,8 +329,8 @@ func (handler *UnroutedHandler) PostFile(w http.ResponseWriter, r *http.Request) if isFinal { concatableUpload := handler.composer.Concater.AsConcatableUpload(upload) - if err := concatableUpload.ConcatUploads(ctx, partialUploads); err != nil { - handler.sendError(w, r, err) + if err := concatableUpload.ConcatUploads(c, partialUploads); err != nil { + handler.sendError(c, err) return } info.Offset = size @@ -338,63 +342,63 @@ func (handler *UnroutedHandler) PostFile(w http.ResponseWriter, r *http.Request) if containsChunk { if handler.composer.UsesLocker { - lock, err := handler.lockUpload(id) + lock, err := handler.lockUpload(c, id) if err != nil { - handler.sendError(w, r, err) + handler.sendError(c, err) return } defer lock.Unlock() } - resp, err = handler.writeChunk(ctx, upload, info, resp, r) + resp, err = handler.writeChunk(c, resp, upload, info) if err != nil { - handler.sendError(w, r, err) + handler.sendError(c, err) return } } else if !sizeIsDeferred && size == 0 { // Directly finish the upload if the upload is empty (i.e. has a size of 0). // This statement is in an else-if block to avoid causing duplicate calls // to finishUploadIfComplete if an upload is empty and contains a chunk. - resp, err = handler.finishUploadIfComplete(ctx, upload, info, resp, r) + resp, err = handler.finishUploadIfComplete(c, resp, upload, info) if err != nil { - handler.sendError(w, r, err) + handler.sendError(c, err) return } } - handler.sendResp(w, r, resp) + handler.sendResp(c, resp) } // HeadFile returns the length and offset for the HEAD request func (handler *UnroutedHandler) HeadFile(w http.ResponseWriter, r *http.Request) { - ctx := context.Background() + c := newContext(w, r) id, err := extractIDFromPath(r.URL.Path) if err != nil { - handler.sendError(w, r, err) + handler.sendError(c, err) return } if handler.composer.UsesLocker { - lock, err := handler.lockUpload(id) + lock, err := handler.lockUpload(c, id) if err != nil { - handler.sendError(w, r, err) + handler.sendError(c, err) return } defer lock.Unlock() } - upload, err := handler.composer.Core.GetUpload(ctx, id) + upload, err := handler.composer.Core.GetUpload(c, id) if err != nil { - handler.sendError(w, r, err) + handler.sendError(c, err) return } - info, err := upload.GetInfo(ctx) + info, err := upload.GetInfo(c) if err != nil { - handler.sendError(w, r, err) + handler.sendError(c, err) return } @@ -432,63 +436,63 @@ func (handler *UnroutedHandler) HeadFile(w http.ResponseWriter, r *http.Request) resp.Headers["Cache-Control"] = "no-store" resp.Headers["Upload-Offset"] = strconv.FormatInt(info.Offset, 10) - handler.sendResp(w, r, resp) + handler.sendResp(c, resp) } // PatchFile adds a chunk to an upload. This operation is only allowed // if enough space in the upload is left. func (handler *UnroutedHandler) PatchFile(w http.ResponseWriter, r *http.Request) { - ctx := context.Background() + c := newContext(w, r) // Check for presence of application/offset+octet-stream if r.Header.Get("Content-Type") != "application/offset+octet-stream" { - handler.sendError(w, r, ErrInvalidContentType) + handler.sendError(c, ErrInvalidContentType) return } // Check for presence of a valid Upload-Offset Header offset, err := strconv.ParseInt(r.Header.Get("Upload-Offset"), 10, 64) if err != nil || offset < 0 { - handler.sendError(w, r, ErrInvalidOffset) + handler.sendError(c, ErrInvalidOffset) return } id, err := extractIDFromPath(r.URL.Path) if err != nil { - handler.sendError(w, r, err) + handler.sendError(c, err) return } if handler.composer.UsesLocker { - lock, err := handler.lockUpload(id) + lock, err := handler.lockUpload(c, id) if err != nil { - handler.sendError(w, r, err) + handler.sendError(c, err) return } defer lock.Unlock() } - upload, err := handler.composer.Core.GetUpload(ctx, id) + upload, err := handler.composer.Core.GetUpload(c, id) if err != nil { - handler.sendError(w, r, err) + handler.sendError(c, err) return } - info, err := upload.GetInfo(ctx) + info, err := upload.GetInfo(c) if err != nil { - handler.sendError(w, r, err) + handler.sendError(c, err) return } // Modifying a final upload is not allowed if info.IsFinal { - handler.sendError(w, r, ErrModifyFinal) + handler.sendError(c, ErrModifyFinal) return } if offset != info.Offset { - handler.sendError(w, r, ErrMismatchOffset) + handler.sendError(c, ErrMismatchOffset) return } @@ -500,28 +504,28 @@ func (handler *UnroutedHandler) PatchFile(w http.ResponseWriter, r *http.Request // Do not proxy the call to the data store if the upload is already completed if !info.SizeIsDeferred && info.Offset == info.Size { resp.Headers["Upload-Offset"] = strconv.FormatInt(offset, 10) - handler.sendResp(w, r, resp) + handler.sendResp(c, resp) return } if r.Header.Get("Upload-Length") != "" { if !handler.composer.UsesLengthDeferrer { - handler.sendError(w, r, ErrNotImplemented) + handler.sendError(c, ErrNotImplemented) return } if !info.SizeIsDeferred { - handler.sendError(w, r, ErrInvalidUploadLength) + handler.sendError(c, ErrInvalidUploadLength) return } uploadLength, err := strconv.ParseInt(r.Header.Get("Upload-Length"), 10, 64) if err != nil || uploadLength < 0 || uploadLength < info.Offset || (handler.config.MaxSize > 0 && uploadLength > handler.config.MaxSize) { - handler.sendError(w, r, ErrInvalidUploadLength) + handler.sendError(c, ErrInvalidUploadLength) return } lengthDeclarableUpload := handler.composer.LengthDeferrer.AsLengthDeclarableUpload(upload) - if err := lengthDeclarableUpload.DeclareLength(ctx, uploadLength); err != nil { - handler.sendError(w, r, err) + if err := lengthDeclarableUpload.DeclareLength(c, uploadLength); err != nil { + handler.sendError(c, err) return } @@ -529,20 +533,21 @@ func (handler *UnroutedHandler) PatchFile(w http.ResponseWriter, r *http.Request info.SizeIsDeferred = false } - resp, err = handler.writeChunk(ctx, upload, info, resp, r) + resp, err = handler.writeChunk(c, resp, upload, info) if err != nil { - handler.sendError(w, r, err) + handler.sendError(c, err) return } - handler.sendResp(w, r, resp) + handler.sendResp(c, resp) } // writeChunk reads the body from the requests r and appends it to the upload // with the corresponding id. Afterwards, it will set the necessary response // headers but will not send the response. -func (handler *UnroutedHandler) writeChunk(ctx context.Context, upload Upload, info FileInfo, resp HTTPResponse, r *http.Request) (HTTPResponse, error) { +func (handler *UnroutedHandler) writeChunk(c *httpContext, resp HTTPResponse, upload Upload, info FileInfo) (HTTPResponse, error) { // Get Content-Length if possible + r := c.req length := r.ContentLength offset := info.Offset id := info.ID @@ -577,7 +582,7 @@ func (handler *UnroutedHandler) writeChunk(ctx context.Context, upload Upload, i // available in the case of a malicious request. if r.Body != nil { // Limit the data read from the request's body to the allowed maximum - reader := newBodyReader(io.LimitReader(r.Body, maxSize)) + c.body = newBodyReader(r.Body, maxSize) // We use a context object to allow the hook system to cancel an upload uploadCtx, stopUpload := context.WithCancel(context.Background()) @@ -592,18 +597,19 @@ func (handler *UnroutedHandler) writeChunk(ctx context.Context, upload Upload, i go func() { // Interrupt the Read() call from the request body <-uploadCtx.Done() + // TODO: Consider using CloseWithError function from BodyReader terminateUpload = true r.Body.Close() }() if handler.config.NotifyUploadProgress { - stopProgressEvents := handler.sendProgressMessages(newHookEvent(info, r), reader) + stopProgressEvents := handler.sendProgressMessages(newHookEvent(info, r), c.body) defer close(stopProgressEvents) } - bytesWritten, err = upload.WriteChunk(ctx, offset, reader) + bytesWritten, err = upload.WriteChunk(c, offset, c.body) if terminateUpload && handler.composer.UsesTerminater { - if terminateErr := handler.terminateUpload(ctx, upload, info, r); terminateErr != nil { + if terminateErr := handler.terminateUpload(c, upload, info); terminateErr != nil { // We only log this error and not show it to the user since this // termination error is not relevant to the uploading client handler.log("UploadStopTerminateError", "id", id, "error", terminateErr.Error()) @@ -612,7 +618,7 @@ func (handler *UnroutedHandler) writeChunk(ctx context.Context, upload Upload, i // If we encountered an error while reading the body from the HTTP request, log it, but only include // it in the response, if the store did not also return an error. - if bodyErr := reader.hasError(); bodyErr != nil { + if bodyErr := c.body.hasError(); bodyErr != nil { handler.log("BodyReadError", "id", id, "error", bodyErr.Error()) if err == nil { err = bodyErr @@ -638,17 +644,19 @@ func (handler *UnroutedHandler) writeChunk(ctx context.Context, upload Upload, i handler.Metrics.incBytesReceived(uint64(bytesWritten)) info.Offset = newOffset - return handler.finishUploadIfComplete(ctx, upload, info, resp, r) + return handler.finishUploadIfComplete(c, resp, upload, info) } // finishUploadIfComplete checks whether an upload is completed (i.e. upload offset // matches upload size) and if so, it will call the data store's FinishUpload // function and send the necessary message on the CompleteUpload channel. -func (handler *UnroutedHandler) finishUploadIfComplete(ctx context.Context, upload Upload, info FileInfo, resp HTTPResponse, r *http.Request) (HTTPResponse, error) { +func (handler *UnroutedHandler) finishUploadIfComplete(c *httpContext, resp HTTPResponse, upload Upload, info FileInfo) (HTTPResponse, error) { + r := c.req + // If the upload is completed, ... if !info.SizeIsDeferred && info.Offset == info.Size { // ... allow custom mechanism to finish and cleanup the upload - if err := upload.FinishUpload(ctx); err != nil { + if err := upload.FinishUpload(c); err != nil { return resp, err } @@ -674,33 +682,33 @@ func (handler *UnroutedHandler) finishUploadIfComplete(ctx context.Context, uplo // GetFile handles requests to download a file using a GET request. This is not // part of the specification. func (handler *UnroutedHandler) GetFile(w http.ResponseWriter, r *http.Request) { - ctx := context.Background() + c := newContext(w, r) id, err := extractIDFromPath(r.URL.Path) if err != nil { - handler.sendError(w, r, err) + handler.sendError(c, err) return } if handler.composer.UsesLocker { - lock, err := handler.lockUpload(id) + lock, err := handler.lockUpload(c, id) if err != nil { - handler.sendError(w, r, err) + handler.sendError(c, err) return } defer lock.Unlock() } - upload, err := handler.composer.Core.GetUpload(ctx, id) + upload, err := handler.composer.Core.GetUpload(c, id) if err != nil { - handler.sendError(w, r, err) + handler.sendError(c, err) return } - info, err := upload.GetInfo(ctx) + info, err := upload.GetInfo(c) if err != nil { - handler.sendError(w, r, err) + handler.sendError(c, err) return } @@ -718,17 +726,17 @@ func (handler *UnroutedHandler) GetFile(w http.ResponseWriter, r *http.Request) // If no data has been uploaded yet, respond with an empty "204 No Content" status. if info.Offset == 0 { resp.StatusCode = http.StatusNoContent - handler.sendResp(w, r, resp) + handler.sendResp(c, resp) return } - src, err := upload.GetReader(ctx) + src, err := upload.GetReader(c) if err != nil { - handler.sendError(w, r, err) + handler.sendError(c, err) return } - handler.sendResp(w, r, resp) + handler.sendResp(c, resp) io.Copy(w, src) // Try to close the reader if the io.Closer interface is implemented @@ -800,52 +808,52 @@ func filterContentType(info FileInfo) (contentType string, contentDisposition st // DelFile terminates an upload permanently. func (handler *UnroutedHandler) DelFile(w http.ResponseWriter, r *http.Request) { - ctx := context.Background() + c := newContext(w, r) // Abort the request handling if the required interface is not implemented if !handler.composer.UsesTerminater { - handler.sendError(w, r, ErrNotImplemented) + handler.sendError(c, ErrNotImplemented) return } id, err := extractIDFromPath(r.URL.Path) if err != nil { - handler.sendError(w, r, err) + handler.sendError(c, err) return } if handler.composer.UsesLocker { - lock, err := handler.lockUpload(id) + lock, err := handler.lockUpload(c, id) if err != nil { - handler.sendError(w, r, err) + handler.sendError(c, err) return } defer lock.Unlock() } - upload, err := handler.composer.Core.GetUpload(ctx, id) + upload, err := handler.composer.Core.GetUpload(c, id) if err != nil { - handler.sendError(w, r, err) + handler.sendError(c, err) return } var info FileInfo if handler.config.NotifyTerminatedUploads { - info, err = upload.GetInfo(ctx) + info, err = upload.GetInfo(c) if err != nil { - handler.sendError(w, r, err) + handler.sendError(c, err) return } } - err = handler.terminateUpload(ctx, upload, info, r) + err = handler.terminateUpload(c, upload, info) if err != nil { - handler.sendError(w, r, err) + handler.sendError(c, err) return } - handler.sendResp(w, r, HTTPResponse{ + handler.sendResp(c, HTTPResponse{ StatusCode: http.StatusNoContent, }) } @@ -855,16 +863,16 @@ func (handler *UnroutedHandler) DelFile(w http.ResponseWriter, r *http.Request) // and updates the statistics. // Note the the info argument is only needed if the terminated uploads // notifications are enabled. -func (handler *UnroutedHandler) terminateUpload(ctx context.Context, upload Upload, info FileInfo, r *http.Request) error { +func (handler *UnroutedHandler) terminateUpload(c *httpContext, upload Upload, info FileInfo) error { terminatableUpload := handler.composer.Terminater.AsTerminatableUpload(upload) - err := terminatableUpload.Terminate(ctx) + err := terminatableUpload.Terminate(c) if err != nil { return err } if handler.config.NotifyTerminatedUploads { - handler.TerminatedUploads <- newHookEvent(info, r) + handler.TerminatedUploads <- newHookEvent(info, c.req) } handler.Metrics.incUploadsTerminated() @@ -874,7 +882,7 @@ func (handler *UnroutedHandler) terminateUpload(ctx context.Context, upload Uplo // Send the error in the response body. The status code will be looked up in // ErrStatusCodes. If none is found 500 Internal Error will be used. -func (handler *UnroutedHandler) sendError(w http.ResponseWriter, r *http.Request, err error) { +func (handler *UnroutedHandler) sendError(c *httpContext, err error) { // Errors for read timeouts contain too much information which is not // necessary for us and makes grouping for the metrics harder. The error // message looks like: read tcp 127.0.0.1:1080->127.0.0.1:53673: i/o timeout @@ -909,6 +917,8 @@ func (handler *UnroutedHandler) sendError(w http.ResponseWriter, r *http.Request // err = nil //} + r := c.req + detailedErr, ok := err.(Error) if !ok { handler.log("InternalServerError", "message", err.Error(), "method", r.Method, "path", r.URL.Path, "requestId", getRequestId(r)) @@ -921,15 +931,15 @@ func (handler *UnroutedHandler) sendError(w http.ResponseWriter, r *http.Request detailedErr.HTTPResponse.Body = "" } - handler.sendResp(w, r, detailedErr.HTTPResponse) + handler.sendResp(c, detailedErr.HTTPResponse) handler.Metrics.incErrorsTotal(detailedErr) } // sendResp writes the header to w with the specified status code. -func (handler *UnroutedHandler) sendResp(w http.ResponseWriter, r *http.Request, resp HTTPResponse) { - resp.writeTo(w) +func (handler *UnroutedHandler) sendResp(c *httpContext, resp HTTPResponse) { + resp.writeTo(c.res) - handler.log("ResponseOutgoing", "status", strconv.Itoa(resp.StatusCode), "method", r.Method, "path", r.URL.Path, "requestId", getRequestId(r), "body", resp.Body) + handler.log("ResponseOutgoing", "status", strconv.Itoa(resp.StatusCode), "method", c.req.Method, "path", c.req.URL.Path, "requestId", getRequestId(c.req), "body", resp.Body) } // Make an absolute URLs to the given upload id. If the base path is absolute @@ -1073,13 +1083,23 @@ func (handler *UnroutedHandler) validateNewUploadLengthHeaders(uploadLengthHeade // lockUpload creates a new lock for the given upload ID and attempts to lock it. // The created lock is returned if it was aquired successfully. -func (handler *UnroutedHandler) lockUpload(id string) (Lock, error) { +func (handler *UnroutedHandler) lockUpload(c *httpContext, id string) (Lock, error) { lock, err := handler.composer.Locker.NewLock(id) if err != nil { return nil, err } - if err := lock.Lock(); err != nil { + // TODO: Make lock timeout configurable + ctx, cancelContext := context.WithTimeout(context.Background(), 3*time.Second) + defer cancelContext() + releaseLock := func() { + if c.body != nil { + handler.log("UploadInterrupted", "id", id, "requestId", getRequestId(c.req)) + c.body.closeWithError(ErrUploadInterrupted) + } + } + + if err := lock.Lock(ctx, releaseLock); err != nil { return nil, err } diff --git a/pkg/memorylocker/memorylocker.go b/pkg/memorylocker/memorylocker.go index 504ae3e..7fb4463 100644 --- a/pkg/memorylocker/memorylocker.go +++ b/pkg/memorylocker/memorylocker.go @@ -1,5 +1,6 @@ // Package memorylocker provides an in-memory locking mechanism. // +// TODO: Update comment // When multiple processes are attempting to access an upload, whether it be // by reading or writing, a synchronization mechanism is required to prevent // data corruption, especially to ensure correct offset values and the proper @@ -11,6 +12,7 @@ package memorylocker import ( + "context" "sync" "github.com/tus/tusd/pkg/handler" @@ -20,14 +22,19 @@ import ( // cheap mechanism. Locks will only exist as long as this object is kept in // reference and will be erased if the program exits. type MemoryLocker struct { - locks map[string]struct{} - mutex sync.Mutex + locks map[string]lockEntry + mutex sync.RWMutex +} + +type lockEntry struct { + lockReleased chan struct{} + requestRelease func() } // New creates a new in-memory locker. func New() *MemoryLocker { return &MemoryLocker{ - locks: make(map[string]struct{}), + locks: make(map[string]lockEntry), } } @@ -46,16 +53,40 @@ type memoryLock struct { } // Lock tries to obtain the exclusive lock. -func (lock memoryLock) Lock() error { - lock.locker.mutex.Lock() - defer lock.locker.mutex.Unlock() +func (lock memoryLock) Lock(ctx context.Context, requestRelease func()) error { + lock.locker.mutex.RLock() + entry, ok := lock.locker.locks[lock.id] + lock.locker.mutex.RUnlock() - // Ensure file is not locked - if _, ok := lock.locker.locks[lock.id]; ok { - return handler.ErrFileLocked +requestRelease: + if ok { + // TODO: Make this channel? + // TODO: Should we ensure this is only called once? + entry.requestRelease() + select { + case <-ctx.Done(): + return handler.ErrLockTimeout + case <-entry.lockReleased: + } } - lock.locker.locks[lock.id] = struct{}{} + lock.locker.mutex.Lock() + // Check that the lock has not already been created in the meantime + entry, ok = lock.locker.locks[lock.id] + if ok { + // Lock has been created in the meantime, so we must wait again until it is free + lock.locker.mutex.Unlock() + goto requestRelease + } + + // No lock exists, so we can create it + entry = lockEntry{ + lockReleased: make(chan struct{}), + requestRelease: requestRelease, + } + + lock.locker.locks[lock.id] = entry + lock.locker.mutex.Unlock() return nil } @@ -64,10 +95,14 @@ func (lock memoryLock) Lock() error { func (lock memoryLock) Unlock() error { lock.locker.mutex.Lock() - // Deleting a non-existing key does not end in unexpected errors or panic - // since this operation results in a no-op + lockReleased := lock.locker.locks[lock.id].lockReleased + + // Delete the lock entry entirely delete(lock.locker.locks, lock.id) lock.locker.mutex.Unlock() + + close(lockReleased) + return nil } diff --git a/pkg/memorylocker/memorylocker_test.go b/pkg/memorylocker/memorylocker_test.go index 9fe9224..2aa63e6 100644 --- a/pkg/memorylocker/memorylocker_test.go +++ b/pkg/memorylocker/memorylocker_test.go @@ -1,16 +1,17 @@ package memorylocker import ( + "context" "testing" + "time" "github.com/stretchr/testify/assert" - "github.com/tus/tusd/pkg/handler" ) var _ handler.Locker = &MemoryLocker{} -func TestMemoryLocker(t *testing.T) { +func TestMemoryLocker_LockAndUnlock(t *testing.T) { a := assert.New(t) locker := New() @@ -18,13 +19,62 @@ func TestMemoryLocker(t *testing.T) { lock1, err := locker.NewLock("one") a.NoError(err) - a.NoError(lock1.Lock()) - a.Equal(handler.ErrFileLocked, lock1.Lock()) + a.NoError(lock1.Lock(context.Background(), func() { + panic("must not be called") + })) + a.NoError(lock1.Unlock()) +} + +func TestMemoryLocker_Timeout(t *testing.T) { + a := assert.New(t) + + locker := New() + releaseRequestCalled := false + + lock1, err := locker.NewLock("one") + a.NoError(err) + a.NoError(lock1.Lock(context.Background(), func() { + releaseRequestCalled = true + // We note that the function has been called, but do not + // release the lock + })) + + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) + defer cancel() lock2, err := locker.NewLock("one") a.NoError(err) - a.Equal(handler.ErrFileLocked, lock2.Lock()) + err = lock2.Lock(ctx, func() { + panic("must not be called") + }) - a.NoError(lock1.Unlock()) - a.NoError(lock1.Unlock()) + a.Equal(err, handler.ErrLockTimeout) + a.True(releaseRequestCalled) +} + +func TestMemoryLocker_RequestUnlock(t *testing.T) { + a := assert.New(t) + + locker := New() + releaseRequestCalled := false + + lock1, err := locker.NewLock("one") + a.NoError(err) + a.NoError(lock1.Lock(context.Background(), func() { + releaseRequestCalled = true + <-time.After(10 * time.Millisecond) + a.NoError(lock1.Unlock()) + })) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + lock2, err := locker.NewLock("one") + a.NoError(err) + a.NoError(lock2.Lock(ctx, func() { + panic("must not be called") + })) + a.NoError(lock2.Unlock()) + + a.True(releaseRequestCalled) }