v2: Implement cancelable lock mechanism (#667)
* handler: Implement prototype of new locking back-end * memorylocker2: Switch to channel for release notification * handler: Update locker interface * handler: Add method to close body with error * memorylocker: Replace with new implementation * filelocker: Adjust methods to match interface * handler: Introduce new httpContext * handler: Implement upload interruption * handler: Adjust tests to new inferfaces * handler, memorylocker: Cancel context to avoid leaks
This commit is contained in:
parent
04e786e81a
commit
e52139f977
1
go.mod
1
go.mod
|
@ -18,6 +18,7 @@ require (
|
||||||
github.com/sethgrid/pester v0.0.0-20190127155807-68a33a018ad0
|
github.com/sethgrid/pester v0.0.0-20190127155807-68a33a018ad0
|
||||||
github.com/stretchr/testify v1.7.0
|
github.com/stretchr/testify v1.7.0
|
||||||
github.com/vimeo/go-util v1.4.1
|
github.com/vimeo/go-util v1.4.1
|
||||||
|
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
|
||||||
google.golang.org/api v0.70.0
|
google.golang.org/api v0.70.0
|
||||||
google.golang.org/grpc v1.44.0
|
google.golang.org/grpc v1.44.0
|
||||||
gopkg.in/Acconut/lockfile.v1 v1.1.0
|
gopkg.in/Acconut/lockfile.v1 v1.1.0
|
||||||
|
|
1
go.sum
1
go.sum
|
@ -457,6 +457,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
|
||||||
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
|
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
|
||||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
|
|
|
@ -9,6 +9,7 @@
|
||||||
package filelocker
|
package filelocker
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
|
@ -58,7 +59,8 @@ type fileUploadLock struct {
|
||||||
file lockfile.Lockfile
|
file lockfile.Lockfile
|
||||||
}
|
}
|
||||||
|
|
||||||
func (lock fileUploadLock) Lock() error {
|
// TODO: Implement functionality for ctx and requestRelease.
|
||||||
|
func (lock fileUploadLock) Lock(ctx context.Context, requestRelease func()) error {
|
||||||
err := lock.file.TryLock()
|
err := lock.file.TryLock()
|
||||||
if err == lockfile.ErrBusy {
|
if err == lockfile.ErrBusy {
|
||||||
return handler.ErrFileLocked
|
return handler.ErrFileLocked
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package filelocker
|
package filelocker
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
@ -21,12 +22,12 @@ func TestFileLocker(t *testing.T) {
|
||||||
lock1, err := locker.NewLock("one")
|
lock1, err := locker.NewLock("one")
|
||||||
a.NoError(err)
|
a.NoError(err)
|
||||||
|
|
||||||
a.NoError(lock1.Lock())
|
a.NoError(lock1.Lock(context.TODO(), nil))
|
||||||
a.Equal(handler.ErrFileLocked, lock1.Lock())
|
a.Equal(handler.ErrFileLocked, lock1.Lock(context.TODO(), nil))
|
||||||
|
|
||||||
lock2, err := locker.NewLock("one")
|
lock2, err := locker.NewLock("one")
|
||||||
a.NoError(err)
|
a.NoError(err)
|
||||||
a.Equal(handler.ErrFileLocked, lock2.Lock())
|
a.Equal(handler.ErrFileLocked, lock2.Lock(context.TODO(), nil))
|
||||||
|
|
||||||
a.NoError(lock1.Unlock())
|
a.NoError(lock1.Unlock())
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,13 +14,15 @@ import (
|
||||||
// In addition, the bodyReader keeps track of how many bytes were read.
|
// In addition, the bodyReader keeps track of how many bytes were read.
|
||||||
type bodyReader struct {
|
type bodyReader struct {
|
||||||
reader io.Reader
|
reader io.Reader
|
||||||
|
closer io.Closer
|
||||||
err error
|
err error
|
||||||
bytesCounter int64
|
bytesCounter int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func newBodyReader(r io.Reader) *bodyReader {
|
func newBodyReader(r io.ReadCloser, maxSize int64) *bodyReader {
|
||||||
return &bodyReader{
|
return &bodyReader{
|
||||||
reader: r,
|
reader: io.LimitReader(r, maxSize),
|
||||||
|
closer: r,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -51,3 +53,8 @@ func (r bodyReader) hasError() error {
|
||||||
func (r *bodyReader) bytesRead() int64 {
|
func (r *bodyReader) bytesRead() int64 {
|
||||||
return atomic.LoadInt64(&r.bytesCounter)
|
return atomic.LoadInt64(&r.bytesCounter)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *bodyReader) closeWithError(err error) {
|
||||||
|
r.closer.Close()
|
||||||
|
r.err = err
|
||||||
|
}
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package handler_test
|
package handler_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -38,14 +37,14 @@ func TestConcat(t *testing.T) {
|
||||||
upload := NewMockFullUpload(ctrl)
|
upload := NewMockFullUpload(ctrl)
|
||||||
|
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
store.EXPECT().NewUpload(context.Background(), FileInfo{
|
store.EXPECT().NewUpload(gomock.Any(), FileInfo{
|
||||||
Size: 300,
|
Size: 300,
|
||||||
IsPartial: true,
|
IsPartial: true,
|
||||||
IsFinal: false,
|
IsFinal: false,
|
||||||
PartialUploads: nil,
|
PartialUploads: nil,
|
||||||
MetaData: make(map[string]string),
|
MetaData: make(map[string]string),
|
||||||
}).Return(upload, nil),
|
}).Return(upload, nil),
|
||||||
upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
ID: "foo",
|
ID: "foo",
|
||||||
Size: 300,
|
Size: 300,
|
||||||
IsPartial: true,
|
IsPartial: true,
|
||||||
|
@ -77,8 +76,8 @@ func TestConcat(t *testing.T) {
|
||||||
upload := NewMockFullUpload(ctrl)
|
upload := NewMockFullUpload(ctrl)
|
||||||
|
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
store.EXPECT().GetUpload(context.Background(), "foo").Return(upload, nil),
|
store.EXPECT().GetUpload(gomock.Any(), "foo").Return(upload, nil),
|
||||||
upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
ID: "foo",
|
ID: "foo",
|
||||||
IsPartial: true,
|
IsPartial: true,
|
||||||
}, nil),
|
}, nil),
|
||||||
|
@ -114,26 +113,26 @@ func TestConcat(t *testing.T) {
|
||||||
uploadC := NewMockFullUpload(ctrl)
|
uploadC := NewMockFullUpload(ctrl)
|
||||||
|
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
store.EXPECT().GetUpload(context.Background(), "a").Return(uploadA, nil),
|
store.EXPECT().GetUpload(gomock.Any(), "a").Return(uploadA, nil),
|
||||||
uploadA.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
uploadA.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
IsPartial: true,
|
IsPartial: true,
|
||||||
Size: 5,
|
Size: 5,
|
||||||
Offset: 5,
|
Offset: 5,
|
||||||
}, nil),
|
}, nil),
|
||||||
store.EXPECT().GetUpload(context.Background(), "b").Return(uploadB, nil),
|
store.EXPECT().GetUpload(gomock.Any(), "b").Return(uploadB, nil),
|
||||||
uploadB.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
uploadB.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
IsPartial: true,
|
IsPartial: true,
|
||||||
Size: 5,
|
Size: 5,
|
||||||
Offset: 5,
|
Offset: 5,
|
||||||
}, nil),
|
}, nil),
|
||||||
store.EXPECT().NewUpload(context.Background(), FileInfo{
|
store.EXPECT().NewUpload(gomock.Any(), FileInfo{
|
||||||
Size: 10,
|
Size: 10,
|
||||||
IsPartial: false,
|
IsPartial: false,
|
||||||
IsFinal: true,
|
IsFinal: true,
|
||||||
PartialUploads: []string{"a", "b"},
|
PartialUploads: []string{"a", "b"},
|
||||||
MetaData: make(map[string]string),
|
MetaData: make(map[string]string),
|
||||||
}).Return(uploadC, nil),
|
}).Return(uploadC, nil),
|
||||||
uploadC.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
uploadC.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
ID: "foo",
|
ID: "foo",
|
||||||
Size: 10,
|
Size: 10,
|
||||||
IsPartial: false,
|
IsPartial: false,
|
||||||
|
@ -142,7 +141,7 @@ func TestConcat(t *testing.T) {
|
||||||
MetaData: make(map[string]string),
|
MetaData: make(map[string]string),
|
||||||
}, nil),
|
}, nil),
|
||||||
store.EXPECT().AsConcatableUpload(uploadC).Return(uploadC),
|
store.EXPECT().AsConcatableUpload(uploadC).Return(uploadC),
|
||||||
uploadC.EXPECT().ConcatUploads(context.Background(), []Upload{uploadA, uploadB}).Return(nil),
|
uploadC.EXPECT().ConcatUploads(gomock.Any(), []Upload{uploadA, uploadB}).Return(nil),
|
||||||
)
|
)
|
||||||
|
|
||||||
handler, _ := NewHandler(Config{
|
handler, _ := NewHandler(Config{
|
||||||
|
@ -188,8 +187,8 @@ func TestConcat(t *testing.T) {
|
||||||
upload := NewMockFullUpload(ctrl)
|
upload := NewMockFullUpload(ctrl)
|
||||||
|
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
store.EXPECT().GetUpload(context.Background(), "foo").Return(upload, nil),
|
store.EXPECT().GetUpload(gomock.Any(), "foo").Return(upload, nil),
|
||||||
upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
ID: "foo",
|
ID: "foo",
|
||||||
IsFinal: true,
|
IsFinal: true,
|
||||||
PartialUploads: []string{"a", "b"},
|
PartialUploads: []string{"a", "b"},
|
||||||
|
@ -226,8 +225,8 @@ func TestConcat(t *testing.T) {
|
||||||
// This upload is still unfinished (mismatching offset and size) and
|
// This upload is still unfinished (mismatching offset and size) and
|
||||||
// will therefore cause the POST request to fail.
|
// will therefore cause the POST request to fail.
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
store.EXPECT().GetUpload(context.Background(), "c").Return(upload, nil),
|
store.EXPECT().GetUpload(gomock.Any(), "c").Return(upload, nil),
|
||||||
upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
ID: "c",
|
ID: "c",
|
||||||
IsPartial: true,
|
IsPartial: true,
|
||||||
Size: 5,
|
Size: 5,
|
||||||
|
@ -256,8 +255,8 @@ func TestConcat(t *testing.T) {
|
||||||
upload := NewMockFullUpload(ctrl)
|
upload := NewMockFullUpload(ctrl)
|
||||||
|
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
store.EXPECT().GetUpload(context.Background(), "huge").Return(upload, nil),
|
store.EXPECT().GetUpload(gomock.Any(), "huge").Return(upload, nil),
|
||||||
upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
ID: "huge",
|
ID: "huge",
|
||||||
Size: 1000,
|
Size: 1000,
|
||||||
Offset: 1000,
|
Offset: 1000,
|
||||||
|
@ -286,8 +285,8 @@ func TestConcat(t *testing.T) {
|
||||||
upload := NewMockFullUpload(ctrl)
|
upload := NewMockFullUpload(ctrl)
|
||||||
|
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
store.EXPECT().GetUpload(context.Background(), "foo").Return(upload, nil),
|
store.EXPECT().GetUpload(gomock.Any(), "foo").Return(upload, nil),
|
||||||
upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
ID: "foo",
|
ID: "foo",
|
||||||
Size: 10,
|
Size: 10,
|
||||||
Offset: 0,
|
Offset: 0,
|
||||||
|
|
|
@ -0,0 +1,28 @@
|
||||||
|
package handler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"net/http"
|
||||||
|
)
|
||||||
|
|
||||||
|
// httpContext is wrapper around context.Context that also carries the
|
||||||
|
// corresponding HTTP request and response writer, as well as an
|
||||||
|
// optional body reader
|
||||||
|
// TODO: Consider including HTTPResponse as well
|
||||||
|
type httpContext struct {
|
||||||
|
context.Context
|
||||||
|
|
||||||
|
res http.ResponseWriter
|
||||||
|
req *http.Request
|
||||||
|
body *bodyReader
|
||||||
|
}
|
||||||
|
|
||||||
|
func newContext(w http.ResponseWriter, r *http.Request) *httpContext {
|
||||||
|
return &httpContext{
|
||||||
|
// TODO: Try to reuse the request's context in the future
|
||||||
|
Context: context.Background(),
|
||||||
|
res: w,
|
||||||
|
req: r,
|
||||||
|
body: nil, // body can be filled later for PATCH requests
|
||||||
|
}
|
||||||
|
}
|
|
@ -149,11 +149,15 @@ type Locker interface {
|
||||||
type Lock interface {
|
type Lock interface {
|
||||||
// Lock attempts to obtain an exclusive lock for the upload specified
|
// Lock attempts to obtain an exclusive lock for the upload specified
|
||||||
// by its id.
|
// by its id.
|
||||||
// If this operation fails because the resource is already locked, the
|
// If the lock can be acquired, it will return without error. The requestUnlock
|
||||||
// tusd.ErrFileLocked must be returned. If no error is returned, the attempt
|
// callback is invoked when another caller attempts to create a lock. In this
|
||||||
// is consider to be successful and the upload to be locked until UnlockUpload
|
// case, the holder of the lock should attempt to release the lock as soon
|
||||||
// is invoked for the same upload.
|
// as possible
|
||||||
Lock() error
|
// If the lock is already held, the holder's requestUnlock function will be
|
||||||
|
// invoked to request the lock to be released. If the context is cancelled before
|
||||||
|
// the lock can be acquired, ErrLockTimeout will be returned without acquiring
|
||||||
|
// the lock.
|
||||||
|
Lock(ctx context.Context, requestUnlock func()) error
|
||||||
// Unlock releases an existing lock for the given upload.
|
// Unlock releases an existing lock for the given upload.
|
||||||
Unlock() error
|
Unlock() error
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package handler_test
|
package handler_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -34,9 +33,9 @@ func TestGet(t *testing.T) {
|
||||||
|
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
locker.EXPECT().NewLock("yes").Return(lock, nil),
|
locker.EXPECT().NewLock("yes").Return(lock, nil),
|
||||||
lock.EXPECT().Lock().Return(nil),
|
lock.EXPECT().Lock(gomock.Any(), gomock.Any()).Return(nil),
|
||||||
store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil),
|
store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload, nil),
|
||||||
upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
Offset: 5,
|
Offset: 5,
|
||||||
Size: 20,
|
Size: 20,
|
||||||
MetaData: map[string]string{
|
MetaData: map[string]string{
|
||||||
|
@ -44,7 +43,7 @@ func TestGet(t *testing.T) {
|
||||||
"filetype": "image/jpeg",
|
"filetype": "image/jpeg",
|
||||||
},
|
},
|
||||||
}, nil),
|
}, nil),
|
||||||
upload.EXPECT().GetReader(context.Background()).Return(reader, nil),
|
upload.EXPECT().GetReader(gomock.Any()).Return(reader, nil),
|
||||||
lock.EXPECT().Unlock().Return(nil),
|
lock.EXPECT().Unlock().Return(nil),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -79,8 +78,8 @@ func TestGet(t *testing.T) {
|
||||||
upload := NewMockFullUpload(ctrl)
|
upload := NewMockFullUpload(ctrl)
|
||||||
|
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil),
|
store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload, nil),
|
||||||
upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
Offset: 0,
|
Offset: 0,
|
||||||
}, nil),
|
}, nil),
|
||||||
)
|
)
|
||||||
|
@ -107,8 +106,8 @@ func TestGet(t *testing.T) {
|
||||||
upload := NewMockFullUpload(ctrl)
|
upload := NewMockFullUpload(ctrl)
|
||||||
|
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil),
|
store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload, nil),
|
||||||
upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
Offset: 0,
|
Offset: 0,
|
||||||
MetaData: map[string]string{
|
MetaData: map[string]string{
|
||||||
"filetype": "non-a-valid-mime-type",
|
"filetype": "non-a-valid-mime-type",
|
||||||
|
@ -139,8 +138,8 @@ func TestGet(t *testing.T) {
|
||||||
upload := NewMockFullUpload(ctrl)
|
upload := NewMockFullUpload(ctrl)
|
||||||
|
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil),
|
store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload, nil),
|
||||||
upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
Offset: 0,
|
Offset: 0,
|
||||||
MetaData: map[string]string{
|
MetaData: map[string]string{
|
||||||
"filetype": "application/vnd.openxmlformats-officedocument.wordprocessingml.document.v1",
|
"filetype": "application/vnd.openxmlformats-officedocument.wordprocessingml.document.v1",
|
||||||
|
|
|
@ -293,17 +293,17 @@ func (m *MockFullLock) EXPECT() *MockFullLockMockRecorder {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Lock mocks base method
|
// Lock mocks base method
|
||||||
func (m *MockFullLock) Lock() error {
|
func (m *MockFullLock) Lock(ctx context.Context, requestUnlock func()) error {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
ret := m.ctrl.Call(m, "Lock")
|
ret := m.ctrl.Call(m, "Lock", ctx, requestUnlock)
|
||||||
ret0, _ := ret[0].(error)
|
ret0, _ := ret[0].(error)
|
||||||
return ret0
|
return ret0
|
||||||
}
|
}
|
||||||
|
|
||||||
// Lock indicates an expected call of Lock
|
// Lock indicates an expected call of Lock
|
||||||
func (mr *MockFullLockMockRecorder) Lock() *gomock.Call {
|
func (mr *MockFullLockMockRecorder) Lock(ctx, requestUnlock interface{}) *gomock.Call {
|
||||||
mr.mock.ctrl.T.Helper()
|
mr.mock.ctrl.T.Helper()
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Lock", reflect.TypeOf((*MockFullLock)(nil).Lock))
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Lock", reflect.TypeOf((*MockFullLock)(nil).Lock), ctx, requestUnlock)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unlock mocks base method
|
// Unlock mocks base method
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package handler_test
|
package handler_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
@ -19,9 +18,9 @@ func TestHead(t *testing.T) {
|
||||||
|
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
locker.EXPECT().NewLock("yes").Return(lock, nil),
|
locker.EXPECT().NewLock("yes").Return(lock, nil),
|
||||||
lock.EXPECT().Lock().Return(nil),
|
lock.EXPECT().Lock(gomock.Any(), gomock.Any()).Return(nil),
|
||||||
store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil),
|
store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload, nil),
|
||||||
upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
Offset: 11,
|
Offset: 11,
|
||||||
Size: 44,
|
Size: 44,
|
||||||
MetaData: map[string]string{
|
MetaData: map[string]string{
|
||||||
|
@ -64,7 +63,7 @@ func TestHead(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
SubTest(t, "UploadNotFoundFail", func(t *testing.T, store *MockFullDataStore, composer *StoreComposer) {
|
SubTest(t, "UploadNotFoundFail", func(t *testing.T, store *MockFullDataStore, composer *StoreComposer) {
|
||||||
store.EXPECT().GetUpload(context.Background(), "no").Return(nil, ErrNotFound)
|
store.EXPECT().GetUpload(gomock.Any(), "no").Return(nil, ErrNotFound)
|
||||||
|
|
||||||
handler, _ := NewHandler(Config{
|
handler, _ := NewHandler(Config{
|
||||||
StoreComposer: composer,
|
StoreComposer: composer,
|
||||||
|
@ -91,8 +90,8 @@ func TestHead(t *testing.T) {
|
||||||
upload := NewMockFullUpload(ctrl)
|
upload := NewMockFullUpload(ctrl)
|
||||||
|
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil),
|
store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload, nil),
|
||||||
upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
SizeIsDeferred: true,
|
SizeIsDeferred: true,
|
||||||
Size: 0,
|
Size: 0,
|
||||||
}, nil),
|
}, nil),
|
||||||
|
@ -121,8 +120,8 @@ func TestHead(t *testing.T) {
|
||||||
upload := NewMockFullUpload(ctrl)
|
upload := NewMockFullUpload(ctrl)
|
||||||
|
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil),
|
store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload, nil),
|
||||||
upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
SizeIsDeferred: false,
|
SizeIsDeferred: false,
|
||||||
Size: 10,
|
Size: 10,
|
||||||
}, nil),
|
}, nil),
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package handler_test
|
package handler_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"errors"
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
@ -23,14 +22,14 @@ func TestPatch(t *testing.T) {
|
||||||
upload := NewMockFullUpload(ctrl)
|
upload := NewMockFullUpload(ctrl)
|
||||||
|
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil),
|
store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload, nil),
|
||||||
upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
ID: "yes",
|
ID: "yes",
|
||||||
Offset: 5,
|
Offset: 5,
|
||||||
Size: 10,
|
Size: 10,
|
||||||
}, nil),
|
}, nil),
|
||||||
upload.EXPECT().WriteChunk(context.Background(), int64(5), NewReaderMatcher("hello")).Return(int64(5), nil),
|
upload.EXPECT().WriteChunk(gomock.Any(), int64(5), NewReaderMatcher("hello")).Return(int64(5), nil),
|
||||||
upload.EXPECT().FinishUpload(context.Background()),
|
upload.EXPECT().FinishUpload(gomock.Any()),
|
||||||
)
|
)
|
||||||
|
|
||||||
handler, _ := NewHandler(Config{
|
handler, _ := NewHandler(Config{
|
||||||
|
@ -75,14 +74,14 @@ func TestPatch(t *testing.T) {
|
||||||
upload := NewMockFullUpload(ctrl)
|
upload := NewMockFullUpload(ctrl)
|
||||||
|
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil),
|
store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload, nil),
|
||||||
upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
ID: "yes",
|
ID: "yes",
|
||||||
Offset: 5,
|
Offset: 5,
|
||||||
Size: 10,
|
Size: 10,
|
||||||
}, nil),
|
}, nil),
|
||||||
upload.EXPECT().WriteChunk(context.Background(), int64(5), NewReaderMatcher("hello")).Return(int64(5), nil),
|
upload.EXPECT().WriteChunk(gomock.Any(), int64(5), NewReaderMatcher("hello")).Return(int64(5), nil),
|
||||||
upload.EXPECT().FinishUpload(context.Background()),
|
upload.EXPECT().FinishUpload(gomock.Any()),
|
||||||
)
|
)
|
||||||
|
|
||||||
handler, _ := NewHandler(Config{
|
handler, _ := NewHandler(Config{
|
||||||
|
@ -112,8 +111,8 @@ func TestPatch(t *testing.T) {
|
||||||
upload := NewMockFullUpload(ctrl)
|
upload := NewMockFullUpload(ctrl)
|
||||||
|
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil),
|
store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload, nil),
|
||||||
upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
ID: "yes",
|
ID: "yes",
|
||||||
Offset: 20,
|
Offset: 20,
|
||||||
Size: 20,
|
Size: 20,
|
||||||
|
@ -141,7 +140,7 @@ func TestPatch(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
SubTest(t, "UploadNotFoundFail", func(t *testing.T, store *MockFullDataStore, composer *StoreComposer) {
|
SubTest(t, "UploadNotFoundFail", func(t *testing.T, store *MockFullDataStore, composer *StoreComposer) {
|
||||||
store.EXPECT().GetUpload(context.Background(), "no").Return(nil, ErrNotFound)
|
store.EXPECT().GetUpload(gomock.Any(), "no").Return(nil, ErrNotFound)
|
||||||
|
|
||||||
handler, _ := NewHandler(Config{
|
handler, _ := NewHandler(Config{
|
||||||
StoreComposer: composer,
|
StoreComposer: composer,
|
||||||
|
@ -165,8 +164,8 @@ func TestPatch(t *testing.T) {
|
||||||
upload := NewMockFullUpload(ctrl)
|
upload := NewMockFullUpload(ctrl)
|
||||||
|
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil),
|
store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload, nil),
|
||||||
upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
ID: "yes",
|
ID: "yes",
|
||||||
Offset: 5,
|
Offset: 5,
|
||||||
}, nil),
|
}, nil),
|
||||||
|
@ -194,8 +193,8 @@ func TestPatch(t *testing.T) {
|
||||||
upload := NewMockFullUpload(ctrl)
|
upload := NewMockFullUpload(ctrl)
|
||||||
|
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil),
|
store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload, nil),
|
||||||
upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
ID: "yes",
|
ID: "yes",
|
||||||
Offset: 5,
|
Offset: 5,
|
||||||
Size: 10,
|
Size: 10,
|
||||||
|
@ -268,14 +267,14 @@ func TestPatch(t *testing.T) {
|
||||||
upload := NewMockFullUpload(ctrl)
|
upload := NewMockFullUpload(ctrl)
|
||||||
|
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil),
|
store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload, nil),
|
||||||
upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
ID: "yes",
|
ID: "yes",
|
||||||
Offset: 5,
|
Offset: 5,
|
||||||
Size: 20,
|
Size: 20,
|
||||||
}, nil),
|
}, nil),
|
||||||
upload.EXPECT().WriteChunk(context.Background(), int64(5), NewReaderMatcher("hellothisismore")).Return(int64(15), nil),
|
upload.EXPECT().WriteChunk(gomock.Any(), int64(5), NewReaderMatcher("hellothisismore")).Return(int64(15), nil),
|
||||||
upload.EXPECT().FinishUpload(context.Background()),
|
upload.EXPECT().FinishUpload(gomock.Any()),
|
||||||
)
|
)
|
||||||
|
|
||||||
handler, _ := NewHandler(Config{
|
handler, _ := NewHandler(Config{
|
||||||
|
@ -310,17 +309,17 @@ func TestPatch(t *testing.T) {
|
||||||
upload := NewMockFullUpload(ctrl)
|
upload := NewMockFullUpload(ctrl)
|
||||||
|
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil),
|
store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload, nil),
|
||||||
upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
ID: "yes",
|
ID: "yes",
|
||||||
Offset: 5,
|
Offset: 5,
|
||||||
Size: 0,
|
Size: 0,
|
||||||
SizeIsDeferred: true,
|
SizeIsDeferred: true,
|
||||||
}, nil),
|
}, nil),
|
||||||
store.EXPECT().AsLengthDeclarableUpload(upload).Return(upload),
|
store.EXPECT().AsLengthDeclarableUpload(upload).Return(upload),
|
||||||
upload.EXPECT().DeclareLength(context.Background(), int64(20)),
|
upload.EXPECT().DeclareLength(gomock.Any(), int64(20)),
|
||||||
upload.EXPECT().WriteChunk(context.Background(), int64(5), NewReaderMatcher("hellothisismore")).Return(int64(15), nil),
|
upload.EXPECT().WriteChunk(gomock.Any(), int64(5), NewReaderMatcher("hellothisismore")).Return(int64(15), nil),
|
||||||
upload.EXPECT().FinishUpload(context.Background()),
|
upload.EXPECT().FinishUpload(gomock.Any()),
|
||||||
)
|
)
|
||||||
|
|
||||||
handler, _ := NewHandler(Config{
|
handler, _ := NewHandler(Config{
|
||||||
|
@ -353,16 +352,16 @@ func TestPatch(t *testing.T) {
|
||||||
upload := NewMockFullUpload(ctrl)
|
upload := NewMockFullUpload(ctrl)
|
||||||
|
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil),
|
store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload, nil),
|
||||||
upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
ID: "yes",
|
ID: "yes",
|
||||||
Offset: 20,
|
Offset: 20,
|
||||||
Size: 0,
|
Size: 0,
|
||||||
SizeIsDeferred: true,
|
SizeIsDeferred: true,
|
||||||
}, nil),
|
}, nil),
|
||||||
store.EXPECT().AsLengthDeclarableUpload(upload).Return(upload),
|
store.EXPECT().AsLengthDeclarableUpload(upload).Return(upload),
|
||||||
upload.EXPECT().DeclareLength(context.Background(), int64(20)),
|
upload.EXPECT().DeclareLength(gomock.Any(), int64(20)),
|
||||||
upload.EXPECT().FinishUpload(context.Background()),
|
upload.EXPECT().FinishUpload(gomock.Any()),
|
||||||
)
|
)
|
||||||
|
|
||||||
handler, _ := NewHandler(Config{
|
handler, _ := NewHandler(Config{
|
||||||
|
@ -392,26 +391,26 @@ func TestPatch(t *testing.T) {
|
||||||
upload2 := NewMockFullUpload(ctrl)
|
upload2 := NewMockFullUpload(ctrl)
|
||||||
|
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
store.EXPECT().GetUpload(context.Background(), "yes").Return(upload1, nil),
|
store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload1, nil),
|
||||||
upload1.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
upload1.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
ID: "yes",
|
ID: "yes",
|
||||||
Offset: 5,
|
Offset: 5,
|
||||||
Size: 0,
|
Size: 0,
|
||||||
SizeIsDeferred: true,
|
SizeIsDeferred: true,
|
||||||
}, nil),
|
}, nil),
|
||||||
store.EXPECT().AsLengthDeclarableUpload(upload1).Return(upload1),
|
store.EXPECT().AsLengthDeclarableUpload(upload1).Return(upload1),
|
||||||
upload1.EXPECT().DeclareLength(context.Background(), int64(20)),
|
upload1.EXPECT().DeclareLength(gomock.Any(), int64(20)),
|
||||||
upload1.EXPECT().WriteChunk(context.Background(), int64(5), NewReaderMatcher("hello")).Return(int64(5), nil),
|
upload1.EXPECT().WriteChunk(gomock.Any(), int64(5), NewReaderMatcher("hello")).Return(int64(5), nil),
|
||||||
|
|
||||||
store.EXPECT().GetUpload(context.Background(), "yes").Return(upload2, nil),
|
store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload2, nil),
|
||||||
upload2.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
upload2.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
ID: "yes",
|
ID: "yes",
|
||||||
Offset: 10,
|
Offset: 10,
|
||||||
Size: 20,
|
Size: 20,
|
||||||
SizeIsDeferred: false,
|
SizeIsDeferred: false,
|
||||||
}, nil),
|
}, nil),
|
||||||
upload2.EXPECT().WriteChunk(context.Background(), int64(10), NewReaderMatcher("thisismore")).Return(int64(10), nil),
|
upload2.EXPECT().WriteChunk(gomock.Any(), int64(10), NewReaderMatcher("thisismore")).Return(int64(10), nil),
|
||||||
upload2.EXPECT().FinishUpload(context.Background()),
|
upload2.EXPECT().FinishUpload(gomock.Any()),
|
||||||
)
|
)
|
||||||
|
|
||||||
handler, _ := NewHandler(Config{
|
handler, _ := NewHandler(Config{
|
||||||
|
@ -460,14 +459,14 @@ func TestPatch(t *testing.T) {
|
||||||
|
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
locker.EXPECT().NewLock("yes").Return(lock, nil),
|
locker.EXPECT().NewLock("yes").Return(lock, nil),
|
||||||
lock.EXPECT().Lock().Return(nil),
|
lock.EXPECT().Lock(gomock.Any(), gomock.Any()).Return(nil),
|
||||||
store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil),
|
store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload, nil),
|
||||||
upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
ID: "yes",
|
ID: "yes",
|
||||||
Offset: 0,
|
Offset: 0,
|
||||||
Size: 20,
|
Size: 20,
|
||||||
}, nil),
|
}, nil),
|
||||||
upload.EXPECT().WriteChunk(context.Background(), int64(0), NewReaderMatcher("hello")).Return(int64(5), nil),
|
upload.EXPECT().WriteChunk(gomock.Any(), int64(0), NewReaderMatcher("hello")).Return(int64(5), nil),
|
||||||
lock.EXPECT().Unlock().Return(nil),
|
lock.EXPECT().Unlock().Return(nil),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -500,13 +499,13 @@ func TestPatch(t *testing.T) {
|
||||||
// We simulate that the upload has already an offset of 10 bytes. Therefore, the progress notifications
|
// We simulate that the upload has already an offset of 10 bytes. Therefore, the progress notifications
|
||||||
// must be the sum of the exisiting offset and the newly read bytes.
|
// must be the sum of the exisiting offset and the newly read bytes.
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil),
|
store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload, nil),
|
||||||
upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
ID: "yes",
|
ID: "yes",
|
||||||
Offset: 10,
|
Offset: 10,
|
||||||
Size: 100,
|
Size: 100,
|
||||||
}, nil),
|
}, nil),
|
||||||
upload.EXPECT().WriteChunk(context.Background(), int64(10), NewReaderMatcher("first second third")).Return(int64(18), nil),
|
upload.EXPECT().WriteChunk(gomock.Any(), int64(10), NewReaderMatcher("first second third")).Return(int64(18), nil),
|
||||||
)
|
)
|
||||||
|
|
||||||
handler, _ := NewHandler(Config{
|
handler, _ := NewHandler(Config{
|
||||||
|
@ -574,15 +573,15 @@ func TestPatch(t *testing.T) {
|
||||||
upload := NewMockFullUpload(ctrl)
|
upload := NewMockFullUpload(ctrl)
|
||||||
|
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil),
|
store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload, nil),
|
||||||
upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
ID: "yes",
|
ID: "yes",
|
||||||
Offset: 0,
|
Offset: 0,
|
||||||
Size: 100,
|
Size: 100,
|
||||||
}, nil),
|
}, nil),
|
||||||
upload.EXPECT().WriteChunk(context.Background(), int64(0), NewReaderMatcher("first ")).Return(int64(6), nil),
|
upload.EXPECT().WriteChunk(gomock.Any(), int64(0), NewReaderMatcher("first ")).Return(int64(6), nil),
|
||||||
store.EXPECT().AsTerminatableUpload(upload).Return(upload),
|
store.EXPECT().AsTerminatableUpload(upload).Return(upload),
|
||||||
upload.EXPECT().Terminate(context.Background()),
|
upload.EXPECT().Terminate(gomock.Any()),
|
||||||
)
|
)
|
||||||
|
|
||||||
handler, _ := NewHandler(Config{
|
handler, _ := NewHandler(Config{
|
||||||
|
@ -644,14 +643,14 @@ func TestPatch(t *testing.T) {
|
||||||
upload := NewMockFullUpload(ctrl)
|
upload := NewMockFullUpload(ctrl)
|
||||||
|
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil),
|
store.EXPECT().GetUpload(gomock.Any(), "yes").Return(upload, nil),
|
||||||
upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
ID: "yes",
|
ID: "yes",
|
||||||
Offset: 0,
|
Offset: 0,
|
||||||
Size: 100,
|
Size: 100,
|
||||||
}, nil),
|
}, nil),
|
||||||
// The reader for WriteChunk must not return an error.
|
// The reader for WriteChunk must not return an error.
|
||||||
upload.EXPECT().WriteChunk(context.Background(), int64(0), NewReaderMatcher("first ")).Return(int64(6), nil),
|
upload.EXPECT().WriteChunk(gomock.Any(), int64(0), NewReaderMatcher("first ")).Return(int64(6), nil),
|
||||||
)
|
)
|
||||||
|
|
||||||
handler, _ := NewHandler(Config{
|
handler, _ := NewHandler(Config{
|
||||||
|
|
|
@ -2,7 +2,6 @@ package handler_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -20,7 +19,7 @@ func TestPost(t *testing.T) {
|
||||||
upload := NewMockFullUpload(ctrl)
|
upload := NewMockFullUpload(ctrl)
|
||||||
|
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
store.EXPECT().NewUpload(context.Background(), FileInfo{
|
store.EXPECT().NewUpload(gomock.Any(), FileInfo{
|
||||||
Size: 300,
|
Size: 300,
|
||||||
MetaData: map[string]string{
|
MetaData: map[string]string{
|
||||||
"foo": "hello",
|
"foo": "hello",
|
||||||
|
@ -28,7 +27,7 @@ func TestPost(t *testing.T) {
|
||||||
"empty": "",
|
"empty": "",
|
||||||
},
|
},
|
||||||
}).Return(upload, nil),
|
}).Return(upload, nil),
|
||||||
upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
ID: "foo",
|
ID: "foo",
|
||||||
Size: 300,
|
Size: 300,
|
||||||
MetaData: map[string]string{
|
MetaData: map[string]string{
|
||||||
|
@ -76,16 +75,16 @@ func TestPost(t *testing.T) {
|
||||||
upload := NewMockFullUpload(ctrl)
|
upload := NewMockFullUpload(ctrl)
|
||||||
|
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
store.EXPECT().NewUpload(context.Background(), FileInfo{
|
store.EXPECT().NewUpload(gomock.Any(), FileInfo{
|
||||||
Size: 0,
|
Size: 0,
|
||||||
MetaData: map[string]string{},
|
MetaData: map[string]string{},
|
||||||
}).Return(upload, nil),
|
}).Return(upload, nil),
|
||||||
upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
ID: "foo",
|
ID: "foo",
|
||||||
Size: 0,
|
Size: 0,
|
||||||
MetaData: map[string]string{},
|
MetaData: map[string]string{},
|
||||||
}, nil),
|
}, nil),
|
||||||
upload.EXPECT().FinishUpload(context.Background()).Return(nil),
|
upload.EXPECT().FinishUpload(gomock.Any()).Return(nil),
|
||||||
)
|
)
|
||||||
|
|
||||||
handler, _ := NewHandler(Config{
|
handler, _ := NewHandler(Config{
|
||||||
|
@ -211,11 +210,11 @@ func TestPost(t *testing.T) {
|
||||||
upload := NewMockFullUpload(ctrl)
|
upload := NewMockFullUpload(ctrl)
|
||||||
|
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
store.EXPECT().NewUpload(context.Background(), FileInfo{
|
store.EXPECT().NewUpload(gomock.Any(), FileInfo{
|
||||||
Size: 300,
|
Size: 300,
|
||||||
MetaData: map[string]string{},
|
MetaData: map[string]string{},
|
||||||
}).Return(upload, nil),
|
}).Return(upload, nil),
|
||||||
upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
ID: "foo",
|
ID: "foo",
|
||||||
Size: 300,
|
Size: 300,
|
||||||
MetaData: map[string]string{},
|
MetaData: map[string]string{},
|
||||||
|
@ -248,11 +247,11 @@ func TestPost(t *testing.T) {
|
||||||
upload := NewMockFullUpload(ctrl)
|
upload := NewMockFullUpload(ctrl)
|
||||||
|
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
store.EXPECT().NewUpload(context.Background(), FileInfo{
|
store.EXPECT().NewUpload(gomock.Any(), FileInfo{
|
||||||
Size: 300,
|
Size: 300,
|
||||||
MetaData: map[string]string{},
|
MetaData: map[string]string{},
|
||||||
}).Return(upload, nil),
|
}).Return(upload, nil),
|
||||||
upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
ID: "foo",
|
ID: "foo",
|
||||||
Size: 300,
|
Size: 300,
|
||||||
MetaData: map[string]string{},
|
MetaData: map[string]string{},
|
||||||
|
@ -286,11 +285,11 @@ func TestPost(t *testing.T) {
|
||||||
upload := NewMockFullUpload(ctrl)
|
upload := NewMockFullUpload(ctrl)
|
||||||
|
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
store.EXPECT().NewUpload(context.Background(), FileInfo{
|
store.EXPECT().NewUpload(gomock.Any(), FileInfo{
|
||||||
Size: 300,
|
Size: 300,
|
||||||
MetaData: map[string]string{},
|
MetaData: map[string]string{},
|
||||||
}).Return(upload, nil),
|
}).Return(upload, nil),
|
||||||
upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
ID: "foo",
|
ID: "foo",
|
||||||
Size: 300,
|
Size: 300,
|
||||||
MetaData: map[string]string{},
|
MetaData: map[string]string{},
|
||||||
|
@ -325,11 +324,11 @@ func TestPost(t *testing.T) {
|
||||||
upload := NewMockFullUpload(ctrl)
|
upload := NewMockFullUpload(ctrl)
|
||||||
|
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
store.EXPECT().NewUpload(context.Background(), FileInfo{
|
store.EXPECT().NewUpload(gomock.Any(), FileInfo{
|
||||||
Size: 300,
|
Size: 300,
|
||||||
MetaData: map[string]string{},
|
MetaData: map[string]string{},
|
||||||
}).Return(upload, nil),
|
}).Return(upload, nil),
|
||||||
upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
ID: "foo",
|
ID: "foo",
|
||||||
Size: 300,
|
Size: 300,
|
||||||
MetaData: map[string]string{},
|
MetaData: map[string]string{},
|
||||||
|
@ -367,14 +366,14 @@ func TestPost(t *testing.T) {
|
||||||
upload := NewMockFullUpload(ctrl)
|
upload := NewMockFullUpload(ctrl)
|
||||||
|
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
store.EXPECT().NewUpload(context.Background(), FileInfo{
|
store.EXPECT().NewUpload(gomock.Any(), FileInfo{
|
||||||
Size: 300,
|
Size: 300,
|
||||||
MetaData: map[string]string{
|
MetaData: map[string]string{
|
||||||
"foo": "hello",
|
"foo": "hello",
|
||||||
"bar": "world",
|
"bar": "world",
|
||||||
},
|
},
|
||||||
}).Return(upload, nil),
|
}).Return(upload, nil),
|
||||||
upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
ID: "foo",
|
ID: "foo",
|
||||||
Size: 300,
|
Size: 300,
|
||||||
MetaData: map[string]string{
|
MetaData: map[string]string{
|
||||||
|
@ -383,8 +382,8 @@ func TestPost(t *testing.T) {
|
||||||
},
|
},
|
||||||
}, nil),
|
}, nil),
|
||||||
locker.EXPECT().NewLock("foo").Return(lock, nil),
|
locker.EXPECT().NewLock("foo").Return(lock, nil),
|
||||||
lock.EXPECT().Lock().Return(nil),
|
lock.EXPECT().Lock(gomock.Any(), gomock.Any()).Return(nil),
|
||||||
upload.EXPECT().WriteChunk(context.Background(), int64(0), NewReaderMatcher("hello")).Return(int64(5), nil),
|
upload.EXPECT().WriteChunk(gomock.Any(), int64(0), NewReaderMatcher("hello")).Return(int64(5), nil),
|
||||||
lock.EXPECT().Unlock().Return(nil),
|
lock.EXPECT().Unlock().Return(nil),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -420,11 +419,11 @@ func TestPost(t *testing.T) {
|
||||||
upload := NewMockFullUpload(ctrl)
|
upload := NewMockFullUpload(ctrl)
|
||||||
|
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
store.EXPECT().NewUpload(context.Background(), FileInfo{
|
store.EXPECT().NewUpload(gomock.Any(), FileInfo{
|
||||||
Size: 300,
|
Size: 300,
|
||||||
MetaData: map[string]string{},
|
MetaData: map[string]string{},
|
||||||
}).Return(upload, nil),
|
}).Return(upload, nil),
|
||||||
upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
ID: "foo",
|
ID: "foo",
|
||||||
Size: 300,
|
Size: 300,
|
||||||
MetaData: map[string]string{},
|
MetaData: map[string]string{},
|
||||||
|
@ -454,11 +453,11 @@ func TestPost(t *testing.T) {
|
||||||
upload := NewMockFullUpload(ctrl)
|
upload := NewMockFullUpload(ctrl)
|
||||||
|
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
store.EXPECT().NewUpload(context.Background(), FileInfo{
|
store.EXPECT().NewUpload(gomock.Any(), FileInfo{
|
||||||
Size: 300,
|
Size: 300,
|
||||||
MetaData: map[string]string{},
|
MetaData: map[string]string{},
|
||||||
}).Return(upload, nil),
|
}).Return(upload, nil),
|
||||||
upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
ID: "foo",
|
ID: "foo",
|
||||||
Size: 300,
|
Size: 300,
|
||||||
MetaData: map[string]string{},
|
MetaData: map[string]string{},
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package handler_test
|
package handler_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
@ -39,14 +38,14 @@ func TestTerminate(t *testing.T) {
|
||||||
|
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
locker.EXPECT().NewLock("foo").Return(lock, nil),
|
locker.EXPECT().NewLock("foo").Return(lock, nil),
|
||||||
lock.EXPECT().Lock().Return(nil),
|
lock.EXPECT().Lock(gomock.Any(), gomock.Any()).Return(nil),
|
||||||
store.EXPECT().GetUpload(context.Background(), "foo").Return(upload, nil),
|
store.EXPECT().GetUpload(gomock.Any(), "foo").Return(upload, nil),
|
||||||
upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{
|
upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{
|
||||||
ID: "foo",
|
ID: "foo",
|
||||||
Size: 10,
|
Size: 10,
|
||||||
}, nil),
|
}, nil),
|
||||||
store.EXPECT().AsTerminatableUpload(upload).Return(upload),
|
store.EXPECT().AsTerminatableUpload(upload).Return(upload),
|
||||||
upload.EXPECT().Terminate(context.Background()).Return(nil),
|
upload.EXPECT().Terminate(gomock.Any()).Return(nil),
|
||||||
lock.EXPECT().Unlock().Return(nil),
|
lock.EXPECT().Unlock().Return(nil),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -31,6 +31,7 @@ var (
|
||||||
ErrInvalidOffset = NewError("ERR_INVALID_OFFSET", "missing or invalid Upload-Offset header", http.StatusBadRequest)
|
ErrInvalidOffset = NewError("ERR_INVALID_OFFSET", "missing or invalid Upload-Offset header", http.StatusBadRequest)
|
||||||
ErrNotFound = NewError("ERR_UPLOAD_NOT_FOUND", "upload not found", http.StatusNotFound)
|
ErrNotFound = NewError("ERR_UPLOAD_NOT_FOUND", "upload not found", http.StatusNotFound)
|
||||||
ErrFileLocked = NewError("ERR_UPLOAD_LOCKED", "file currently locked", http.StatusLocked)
|
ErrFileLocked = NewError("ERR_UPLOAD_LOCKED", "file currently locked", http.StatusLocked)
|
||||||
|
ErrLockTimeout = NewError("ERR_LOCK_TIMEOUT", "failed to acquire lock before timeout", http.StatusInternalServerError)
|
||||||
ErrMismatchOffset = NewError("ERR_MISMATCHED_OFFSET", "mismatched offset", http.StatusConflict)
|
ErrMismatchOffset = NewError("ERR_MISMATCHED_OFFSET", "mismatched offset", http.StatusConflict)
|
||||||
ErrSizeExceeded = NewError("ERR_UPLOAD_SIZE_EXCEEDED", "upload's size exceeded", http.StatusRequestEntityTooLarge)
|
ErrSizeExceeded = NewError("ERR_UPLOAD_SIZE_EXCEEDED", "upload's size exceeded", http.StatusRequestEntityTooLarge)
|
||||||
ErrNotImplemented = NewError("ERR_NOT_IMPLEMENTED", "feature not implemented", http.StatusNotImplemented)
|
ErrNotImplemented = NewError("ERR_NOT_IMPLEMENTED", "feature not implemented", http.StatusNotImplemented)
|
||||||
|
@ -41,6 +42,7 @@ var (
|
||||||
ErrInvalidUploadDeferLength = NewError("ERR_INVALID_UPLOAD_LENGTH_DEFER", "invalid Upload-Defer-Length header", http.StatusBadRequest)
|
ErrInvalidUploadDeferLength = NewError("ERR_INVALID_UPLOAD_LENGTH_DEFER", "invalid Upload-Defer-Length header", http.StatusBadRequest)
|
||||||
ErrUploadStoppedByServer = NewError("ERR_UPLOAD_STOPPED", "upload has been stopped by server", http.StatusBadRequest)
|
ErrUploadStoppedByServer = NewError("ERR_UPLOAD_STOPPED", "upload has been stopped by server", http.StatusBadRequest)
|
||||||
ErrUploadRejectedByServer = NewError("ERR_UPLOAD_REJECTED", "upload creation has been rejected by server", http.StatusBadRequest)
|
ErrUploadRejectedByServer = NewError("ERR_UPLOAD_REJECTED", "upload creation has been rejected by server", http.StatusBadRequest)
|
||||||
|
ErrUploadInterrupted = NewError("ERR_UPLAOD_INTERRUPTED", "upload has been interrupted by another request for this upload resource", http.StatusBadRequest)
|
||||||
|
|
||||||
// TODO: These two responses are 500 for backwards compatability. We should discuss
|
// TODO: These two responses are 500 for backwards compatability. We should discuss
|
||||||
// whether it is better to more them to 4XX status codes.
|
// whether it is better to more them to 4XX status codes.
|
||||||
|
@ -194,7 +196,8 @@ func (handler *UnroutedHandler) Middleware(h http.Handler) http.Handler {
|
||||||
// will be ignored or interpreted as a rejection.
|
// will be ignored or interpreted as a rejection.
|
||||||
// For example, the Presto engine, which is used in older versions of
|
// For example, the Presto engine, which is used in older versions of
|
||||||
// Opera, Opera Mobile and Opera Mini, handles CORS this way.
|
// Opera, Opera Mobile and Opera Mini, handles CORS this way.
|
||||||
handler.sendResp(w, r, HTTPResponse{
|
c := newContext(w, r)
|
||||||
|
handler.sendResp(c, HTTPResponse{
|
||||||
StatusCode: http.StatusOK,
|
StatusCode: http.StatusOK,
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
|
@ -204,7 +207,8 @@ func (handler *UnroutedHandler) Middleware(h http.Handler) http.Handler {
|
||||||
// GET and HEAD methods are not checked since a browser may visit this URL and does
|
// GET and HEAD methods are not checked since a browser may visit this URL and does
|
||||||
// not include this header. GET requests are not part of the specification.
|
// not include this header. GET requests are not part of the specification.
|
||||||
if r.Method != "GET" && r.Method != "HEAD" && r.Header.Get("Tus-Resumable") != "1.0.0" {
|
if r.Method != "GET" && r.Method != "HEAD" && r.Header.Get("Tus-Resumable") != "1.0.0" {
|
||||||
handler.sendError(w, r, ErrUnsupportedVersion)
|
c := newContext(w, r)
|
||||||
|
handler.sendError(c, ErrUnsupportedVersion)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -216,7 +220,7 @@ func (handler *UnroutedHandler) Middleware(h http.Handler) http.Handler {
|
||||||
// PostFile creates a new file upload using the datastore after validating the
|
// PostFile creates a new file upload using the datastore after validating the
|
||||||
// length and parsing the metadata.
|
// length and parsing the metadata.
|
||||||
func (handler *UnroutedHandler) PostFile(w http.ResponseWriter, r *http.Request) {
|
func (handler *UnroutedHandler) PostFile(w http.ResponseWriter, r *http.Request) {
|
||||||
ctx := context.Background()
|
c := newContext(w, r)
|
||||||
|
|
||||||
// Check for presence of application/offset+octet-stream. If another content
|
// Check for presence of application/offset+octet-stream. If another content
|
||||||
// type is defined, it will be ignored and treated as none was set because
|
// type is defined, it will be ignored and treated as none was set because
|
||||||
|
@ -233,7 +237,7 @@ func (handler *UnroutedHandler) PostFile(w http.ResponseWriter, r *http.Request)
|
||||||
// Parse Upload-Concat header
|
// Parse Upload-Concat header
|
||||||
isPartial, isFinal, partialUploadIDs, err := parseConcat(concatHeader)
|
isPartial, isFinal, partialUploadIDs, err := parseConcat(concatHeader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handler.sendError(w, r, err)
|
handler.sendError(c, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -246,13 +250,13 @@ func (handler *UnroutedHandler) PostFile(w http.ResponseWriter, r *http.Request)
|
||||||
if isFinal {
|
if isFinal {
|
||||||
// A final upload must not contain a chunk within the creation request
|
// A final upload must not contain a chunk within the creation request
|
||||||
if containsChunk {
|
if containsChunk {
|
||||||
handler.sendError(w, r, ErrModifyFinal)
|
handler.sendError(c, ErrModifyFinal)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
partialUploads, size, err = handler.sizeOfUploads(ctx, partialUploadIDs)
|
partialUploads, size, err = handler.sizeOfUploads(c, partialUploadIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handler.sendError(w, r, err)
|
handler.sendError(c, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -260,14 +264,14 @@ func (handler *UnroutedHandler) PostFile(w http.ResponseWriter, r *http.Request)
|
||||||
uploadDeferLengthHeader := r.Header.Get("Upload-Defer-Length")
|
uploadDeferLengthHeader := r.Header.Get("Upload-Defer-Length")
|
||||||
size, sizeIsDeferred, err = handler.validateNewUploadLengthHeaders(uploadLengthHeader, uploadDeferLengthHeader)
|
size, sizeIsDeferred, err = handler.validateNewUploadLengthHeaders(uploadLengthHeader, uploadDeferLengthHeader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handler.sendError(w, r, err)
|
handler.sendError(c, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test whether the size is still allowed
|
// Test whether the size is still allowed
|
||||||
if handler.config.MaxSize > 0 && size > handler.config.MaxSize {
|
if handler.config.MaxSize > 0 && size > handler.config.MaxSize {
|
||||||
handler.sendError(w, r, ErrMaxSizeExceeded)
|
handler.sendError(c, ErrMaxSizeExceeded)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -291,21 +295,21 @@ func (handler *UnroutedHandler) PostFile(w http.ResponseWriter, r *http.Request)
|
||||||
if handler.config.PreUploadCreateCallback != nil {
|
if handler.config.PreUploadCreateCallback != nil {
|
||||||
resp2, err := handler.config.PreUploadCreateCallback(newHookEvent(info, r))
|
resp2, err := handler.config.PreUploadCreateCallback(newHookEvent(info, r))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handler.sendError(w, r, err)
|
handler.sendError(c, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
resp = resp.MergeWith(resp2)
|
resp = resp.MergeWith(resp2)
|
||||||
}
|
}
|
||||||
|
|
||||||
upload, err := handler.composer.Core.NewUpload(ctx, info)
|
upload, err := handler.composer.Core.NewUpload(c, info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handler.sendError(w, r, err)
|
handler.sendError(c, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
info, err = upload.GetInfo(ctx)
|
info, err = upload.GetInfo(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handler.sendError(w, r, err)
|
handler.sendError(c, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -325,8 +329,8 @@ func (handler *UnroutedHandler) PostFile(w http.ResponseWriter, r *http.Request)
|
||||||
|
|
||||||
if isFinal {
|
if isFinal {
|
||||||
concatableUpload := handler.composer.Concater.AsConcatableUpload(upload)
|
concatableUpload := handler.composer.Concater.AsConcatableUpload(upload)
|
||||||
if err := concatableUpload.ConcatUploads(ctx, partialUploads); err != nil {
|
if err := concatableUpload.ConcatUploads(c, partialUploads); err != nil {
|
||||||
handler.sendError(w, r, err)
|
handler.sendError(c, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
info.Offset = size
|
info.Offset = size
|
||||||
|
@ -338,63 +342,63 @@ func (handler *UnroutedHandler) PostFile(w http.ResponseWriter, r *http.Request)
|
||||||
|
|
||||||
if containsChunk {
|
if containsChunk {
|
||||||
if handler.composer.UsesLocker {
|
if handler.composer.UsesLocker {
|
||||||
lock, err := handler.lockUpload(id)
|
lock, err := handler.lockUpload(c, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handler.sendError(w, r, err)
|
handler.sendError(c, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
defer lock.Unlock()
|
defer lock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err = handler.writeChunk(ctx, upload, info, resp, r)
|
resp, err = handler.writeChunk(c, resp, upload, info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handler.sendError(w, r, err)
|
handler.sendError(c, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
} else if !sizeIsDeferred && size == 0 {
|
} else if !sizeIsDeferred && size == 0 {
|
||||||
// Directly finish the upload if the upload is empty (i.e. has a size of 0).
|
// Directly finish the upload if the upload is empty (i.e. has a size of 0).
|
||||||
// This statement is in an else-if block to avoid causing duplicate calls
|
// This statement is in an else-if block to avoid causing duplicate calls
|
||||||
// to finishUploadIfComplete if an upload is empty and contains a chunk.
|
// to finishUploadIfComplete if an upload is empty and contains a chunk.
|
||||||
resp, err = handler.finishUploadIfComplete(ctx, upload, info, resp, r)
|
resp, err = handler.finishUploadIfComplete(c, resp, upload, info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handler.sendError(w, r, err)
|
handler.sendError(c, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
handler.sendResp(w, r, resp)
|
handler.sendResp(c, resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
// HeadFile returns the length and offset for the HEAD request
|
// HeadFile returns the length and offset for the HEAD request
|
||||||
func (handler *UnroutedHandler) HeadFile(w http.ResponseWriter, r *http.Request) {
|
func (handler *UnroutedHandler) HeadFile(w http.ResponseWriter, r *http.Request) {
|
||||||
ctx := context.Background()
|
c := newContext(w, r)
|
||||||
|
|
||||||
id, err := extractIDFromPath(r.URL.Path)
|
id, err := extractIDFromPath(r.URL.Path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handler.sendError(w, r, err)
|
handler.sendError(c, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if handler.composer.UsesLocker {
|
if handler.composer.UsesLocker {
|
||||||
lock, err := handler.lockUpload(id)
|
lock, err := handler.lockUpload(c, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handler.sendError(w, r, err)
|
handler.sendError(c, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
defer lock.Unlock()
|
defer lock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
upload, err := handler.composer.Core.GetUpload(ctx, id)
|
upload, err := handler.composer.Core.GetUpload(c, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handler.sendError(w, r, err)
|
handler.sendError(c, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
info, err := upload.GetInfo(ctx)
|
info, err := upload.GetInfo(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handler.sendError(w, r, err)
|
handler.sendError(c, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -432,63 +436,63 @@ func (handler *UnroutedHandler) HeadFile(w http.ResponseWriter, r *http.Request)
|
||||||
|
|
||||||
resp.Headers["Cache-Control"] = "no-store"
|
resp.Headers["Cache-Control"] = "no-store"
|
||||||
resp.Headers["Upload-Offset"] = strconv.FormatInt(info.Offset, 10)
|
resp.Headers["Upload-Offset"] = strconv.FormatInt(info.Offset, 10)
|
||||||
handler.sendResp(w, r, resp)
|
handler.sendResp(c, resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
// PatchFile adds a chunk to an upload. This operation is only allowed
|
// PatchFile adds a chunk to an upload. This operation is only allowed
|
||||||
// if enough space in the upload is left.
|
// if enough space in the upload is left.
|
||||||
func (handler *UnroutedHandler) PatchFile(w http.ResponseWriter, r *http.Request) {
|
func (handler *UnroutedHandler) PatchFile(w http.ResponseWriter, r *http.Request) {
|
||||||
ctx := context.Background()
|
c := newContext(w, r)
|
||||||
|
|
||||||
// Check for presence of application/offset+octet-stream
|
// Check for presence of application/offset+octet-stream
|
||||||
if r.Header.Get("Content-Type") != "application/offset+octet-stream" {
|
if r.Header.Get("Content-Type") != "application/offset+octet-stream" {
|
||||||
handler.sendError(w, r, ErrInvalidContentType)
|
handler.sendError(c, ErrInvalidContentType)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check for presence of a valid Upload-Offset Header
|
// Check for presence of a valid Upload-Offset Header
|
||||||
offset, err := strconv.ParseInt(r.Header.Get("Upload-Offset"), 10, 64)
|
offset, err := strconv.ParseInt(r.Header.Get("Upload-Offset"), 10, 64)
|
||||||
if err != nil || offset < 0 {
|
if err != nil || offset < 0 {
|
||||||
handler.sendError(w, r, ErrInvalidOffset)
|
handler.sendError(c, ErrInvalidOffset)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
id, err := extractIDFromPath(r.URL.Path)
|
id, err := extractIDFromPath(r.URL.Path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handler.sendError(w, r, err)
|
handler.sendError(c, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if handler.composer.UsesLocker {
|
if handler.composer.UsesLocker {
|
||||||
lock, err := handler.lockUpload(id)
|
lock, err := handler.lockUpload(c, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handler.sendError(w, r, err)
|
handler.sendError(c, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
defer lock.Unlock()
|
defer lock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
upload, err := handler.composer.Core.GetUpload(ctx, id)
|
upload, err := handler.composer.Core.GetUpload(c, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handler.sendError(w, r, err)
|
handler.sendError(c, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
info, err := upload.GetInfo(ctx)
|
info, err := upload.GetInfo(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handler.sendError(w, r, err)
|
handler.sendError(c, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Modifying a final upload is not allowed
|
// Modifying a final upload is not allowed
|
||||||
if info.IsFinal {
|
if info.IsFinal {
|
||||||
handler.sendError(w, r, ErrModifyFinal)
|
handler.sendError(c, ErrModifyFinal)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if offset != info.Offset {
|
if offset != info.Offset {
|
||||||
handler.sendError(w, r, ErrMismatchOffset)
|
handler.sendError(c, ErrMismatchOffset)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -500,28 +504,28 @@ func (handler *UnroutedHandler) PatchFile(w http.ResponseWriter, r *http.Request
|
||||||
// Do not proxy the call to the data store if the upload is already completed
|
// Do not proxy the call to the data store if the upload is already completed
|
||||||
if !info.SizeIsDeferred && info.Offset == info.Size {
|
if !info.SizeIsDeferred && info.Offset == info.Size {
|
||||||
resp.Headers["Upload-Offset"] = strconv.FormatInt(offset, 10)
|
resp.Headers["Upload-Offset"] = strconv.FormatInt(offset, 10)
|
||||||
handler.sendResp(w, r, resp)
|
handler.sendResp(c, resp)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if r.Header.Get("Upload-Length") != "" {
|
if r.Header.Get("Upload-Length") != "" {
|
||||||
if !handler.composer.UsesLengthDeferrer {
|
if !handler.composer.UsesLengthDeferrer {
|
||||||
handler.sendError(w, r, ErrNotImplemented)
|
handler.sendError(c, ErrNotImplemented)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if !info.SizeIsDeferred {
|
if !info.SizeIsDeferred {
|
||||||
handler.sendError(w, r, ErrInvalidUploadLength)
|
handler.sendError(c, ErrInvalidUploadLength)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
uploadLength, err := strconv.ParseInt(r.Header.Get("Upload-Length"), 10, 64)
|
uploadLength, err := strconv.ParseInt(r.Header.Get("Upload-Length"), 10, 64)
|
||||||
if err != nil || uploadLength < 0 || uploadLength < info.Offset || (handler.config.MaxSize > 0 && uploadLength > handler.config.MaxSize) {
|
if err != nil || uploadLength < 0 || uploadLength < info.Offset || (handler.config.MaxSize > 0 && uploadLength > handler.config.MaxSize) {
|
||||||
handler.sendError(w, r, ErrInvalidUploadLength)
|
handler.sendError(c, ErrInvalidUploadLength)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
lengthDeclarableUpload := handler.composer.LengthDeferrer.AsLengthDeclarableUpload(upload)
|
lengthDeclarableUpload := handler.composer.LengthDeferrer.AsLengthDeclarableUpload(upload)
|
||||||
if err := lengthDeclarableUpload.DeclareLength(ctx, uploadLength); err != nil {
|
if err := lengthDeclarableUpload.DeclareLength(c, uploadLength); err != nil {
|
||||||
handler.sendError(w, r, err)
|
handler.sendError(c, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -529,20 +533,21 @@ func (handler *UnroutedHandler) PatchFile(w http.ResponseWriter, r *http.Request
|
||||||
info.SizeIsDeferred = false
|
info.SizeIsDeferred = false
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err = handler.writeChunk(ctx, upload, info, resp, r)
|
resp, err = handler.writeChunk(c, resp, upload, info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handler.sendError(w, r, err)
|
handler.sendError(c, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
handler.sendResp(w, r, resp)
|
handler.sendResp(c, resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
// writeChunk reads the body from the requests r and appends it to the upload
|
// writeChunk reads the body from the requests r and appends it to the upload
|
||||||
// with the corresponding id. Afterwards, it will set the necessary response
|
// with the corresponding id. Afterwards, it will set the necessary response
|
||||||
// headers but will not send the response.
|
// headers but will not send the response.
|
||||||
func (handler *UnroutedHandler) writeChunk(ctx context.Context, upload Upload, info FileInfo, resp HTTPResponse, r *http.Request) (HTTPResponse, error) {
|
func (handler *UnroutedHandler) writeChunk(c *httpContext, resp HTTPResponse, upload Upload, info FileInfo) (HTTPResponse, error) {
|
||||||
// Get Content-Length if possible
|
// Get Content-Length if possible
|
||||||
|
r := c.req
|
||||||
length := r.ContentLength
|
length := r.ContentLength
|
||||||
offset := info.Offset
|
offset := info.Offset
|
||||||
id := info.ID
|
id := info.ID
|
||||||
|
@ -577,7 +582,7 @@ func (handler *UnroutedHandler) writeChunk(ctx context.Context, upload Upload, i
|
||||||
// available in the case of a malicious request.
|
// available in the case of a malicious request.
|
||||||
if r.Body != nil {
|
if r.Body != nil {
|
||||||
// Limit the data read from the request's body to the allowed maximum
|
// Limit the data read from the request's body to the allowed maximum
|
||||||
reader := newBodyReader(io.LimitReader(r.Body, maxSize))
|
c.body = newBodyReader(r.Body, maxSize)
|
||||||
|
|
||||||
// We use a context object to allow the hook system to cancel an upload
|
// We use a context object to allow the hook system to cancel an upload
|
||||||
uploadCtx, stopUpload := context.WithCancel(context.Background())
|
uploadCtx, stopUpload := context.WithCancel(context.Background())
|
||||||
|
@ -592,18 +597,19 @@ func (handler *UnroutedHandler) writeChunk(ctx context.Context, upload Upload, i
|
||||||
go func() {
|
go func() {
|
||||||
// Interrupt the Read() call from the request body
|
// Interrupt the Read() call from the request body
|
||||||
<-uploadCtx.Done()
|
<-uploadCtx.Done()
|
||||||
|
// TODO: Consider using CloseWithError function from BodyReader
|
||||||
terminateUpload = true
|
terminateUpload = true
|
||||||
r.Body.Close()
|
r.Body.Close()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if handler.config.NotifyUploadProgress {
|
if handler.config.NotifyUploadProgress {
|
||||||
stopProgressEvents := handler.sendProgressMessages(newHookEvent(info, r), reader)
|
stopProgressEvents := handler.sendProgressMessages(newHookEvent(info, r), c.body)
|
||||||
defer close(stopProgressEvents)
|
defer close(stopProgressEvents)
|
||||||
}
|
}
|
||||||
|
|
||||||
bytesWritten, err = upload.WriteChunk(ctx, offset, reader)
|
bytesWritten, err = upload.WriteChunk(c, offset, c.body)
|
||||||
if terminateUpload && handler.composer.UsesTerminater {
|
if terminateUpload && handler.composer.UsesTerminater {
|
||||||
if terminateErr := handler.terminateUpload(ctx, upload, info, r); terminateErr != nil {
|
if terminateErr := handler.terminateUpload(c, upload, info); terminateErr != nil {
|
||||||
// We only log this error and not show it to the user since this
|
// We only log this error and not show it to the user since this
|
||||||
// termination error is not relevant to the uploading client
|
// termination error is not relevant to the uploading client
|
||||||
handler.log("UploadStopTerminateError", "id", id, "error", terminateErr.Error())
|
handler.log("UploadStopTerminateError", "id", id, "error", terminateErr.Error())
|
||||||
|
@ -612,7 +618,7 @@ func (handler *UnroutedHandler) writeChunk(ctx context.Context, upload Upload, i
|
||||||
|
|
||||||
// If we encountered an error while reading the body from the HTTP request, log it, but only include
|
// If we encountered an error while reading the body from the HTTP request, log it, but only include
|
||||||
// it in the response, if the store did not also return an error.
|
// it in the response, if the store did not also return an error.
|
||||||
if bodyErr := reader.hasError(); bodyErr != nil {
|
if bodyErr := c.body.hasError(); bodyErr != nil {
|
||||||
handler.log("BodyReadError", "id", id, "error", bodyErr.Error())
|
handler.log("BodyReadError", "id", id, "error", bodyErr.Error())
|
||||||
if err == nil {
|
if err == nil {
|
||||||
err = bodyErr
|
err = bodyErr
|
||||||
|
@ -638,17 +644,19 @@ func (handler *UnroutedHandler) writeChunk(ctx context.Context, upload Upload, i
|
||||||
handler.Metrics.incBytesReceived(uint64(bytesWritten))
|
handler.Metrics.incBytesReceived(uint64(bytesWritten))
|
||||||
info.Offset = newOffset
|
info.Offset = newOffset
|
||||||
|
|
||||||
return handler.finishUploadIfComplete(ctx, upload, info, resp, r)
|
return handler.finishUploadIfComplete(c, resp, upload, info)
|
||||||
}
|
}
|
||||||
|
|
||||||
// finishUploadIfComplete checks whether an upload is completed (i.e. upload offset
|
// finishUploadIfComplete checks whether an upload is completed (i.e. upload offset
|
||||||
// matches upload size) and if so, it will call the data store's FinishUpload
|
// matches upload size) and if so, it will call the data store's FinishUpload
|
||||||
// function and send the necessary message on the CompleteUpload channel.
|
// function and send the necessary message on the CompleteUpload channel.
|
||||||
func (handler *UnroutedHandler) finishUploadIfComplete(ctx context.Context, upload Upload, info FileInfo, resp HTTPResponse, r *http.Request) (HTTPResponse, error) {
|
func (handler *UnroutedHandler) finishUploadIfComplete(c *httpContext, resp HTTPResponse, upload Upload, info FileInfo) (HTTPResponse, error) {
|
||||||
|
r := c.req
|
||||||
|
|
||||||
// If the upload is completed, ...
|
// If the upload is completed, ...
|
||||||
if !info.SizeIsDeferred && info.Offset == info.Size {
|
if !info.SizeIsDeferred && info.Offset == info.Size {
|
||||||
// ... allow custom mechanism to finish and cleanup the upload
|
// ... allow custom mechanism to finish and cleanup the upload
|
||||||
if err := upload.FinishUpload(ctx); err != nil {
|
if err := upload.FinishUpload(c); err != nil {
|
||||||
return resp, err
|
return resp, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -674,33 +682,33 @@ func (handler *UnroutedHandler) finishUploadIfComplete(ctx context.Context, uplo
|
||||||
// GetFile handles requests to download a file using a GET request. This is not
|
// GetFile handles requests to download a file using a GET request. This is not
|
||||||
// part of the specification.
|
// part of the specification.
|
||||||
func (handler *UnroutedHandler) GetFile(w http.ResponseWriter, r *http.Request) {
|
func (handler *UnroutedHandler) GetFile(w http.ResponseWriter, r *http.Request) {
|
||||||
ctx := context.Background()
|
c := newContext(w, r)
|
||||||
|
|
||||||
id, err := extractIDFromPath(r.URL.Path)
|
id, err := extractIDFromPath(r.URL.Path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handler.sendError(w, r, err)
|
handler.sendError(c, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if handler.composer.UsesLocker {
|
if handler.composer.UsesLocker {
|
||||||
lock, err := handler.lockUpload(id)
|
lock, err := handler.lockUpload(c, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handler.sendError(w, r, err)
|
handler.sendError(c, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
defer lock.Unlock()
|
defer lock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
upload, err := handler.composer.Core.GetUpload(ctx, id)
|
upload, err := handler.composer.Core.GetUpload(c, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handler.sendError(w, r, err)
|
handler.sendError(c, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
info, err := upload.GetInfo(ctx)
|
info, err := upload.GetInfo(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handler.sendError(w, r, err)
|
handler.sendError(c, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -718,17 +726,17 @@ func (handler *UnroutedHandler) GetFile(w http.ResponseWriter, r *http.Request)
|
||||||
// If no data has been uploaded yet, respond with an empty "204 No Content" status.
|
// If no data has been uploaded yet, respond with an empty "204 No Content" status.
|
||||||
if info.Offset == 0 {
|
if info.Offset == 0 {
|
||||||
resp.StatusCode = http.StatusNoContent
|
resp.StatusCode = http.StatusNoContent
|
||||||
handler.sendResp(w, r, resp)
|
handler.sendResp(c, resp)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
src, err := upload.GetReader(ctx)
|
src, err := upload.GetReader(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handler.sendError(w, r, err)
|
handler.sendError(c, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
handler.sendResp(w, r, resp)
|
handler.sendResp(c, resp)
|
||||||
io.Copy(w, src)
|
io.Copy(w, src)
|
||||||
|
|
||||||
// Try to close the reader if the io.Closer interface is implemented
|
// Try to close the reader if the io.Closer interface is implemented
|
||||||
|
@ -800,52 +808,52 @@ func filterContentType(info FileInfo) (contentType string, contentDisposition st
|
||||||
|
|
||||||
// DelFile terminates an upload permanently.
|
// DelFile terminates an upload permanently.
|
||||||
func (handler *UnroutedHandler) DelFile(w http.ResponseWriter, r *http.Request) {
|
func (handler *UnroutedHandler) DelFile(w http.ResponseWriter, r *http.Request) {
|
||||||
ctx := context.Background()
|
c := newContext(w, r)
|
||||||
|
|
||||||
// Abort the request handling if the required interface is not implemented
|
// Abort the request handling if the required interface is not implemented
|
||||||
if !handler.composer.UsesTerminater {
|
if !handler.composer.UsesTerminater {
|
||||||
handler.sendError(w, r, ErrNotImplemented)
|
handler.sendError(c, ErrNotImplemented)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
id, err := extractIDFromPath(r.URL.Path)
|
id, err := extractIDFromPath(r.URL.Path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handler.sendError(w, r, err)
|
handler.sendError(c, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if handler.composer.UsesLocker {
|
if handler.composer.UsesLocker {
|
||||||
lock, err := handler.lockUpload(id)
|
lock, err := handler.lockUpload(c, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handler.sendError(w, r, err)
|
handler.sendError(c, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
defer lock.Unlock()
|
defer lock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
upload, err := handler.composer.Core.GetUpload(ctx, id)
|
upload, err := handler.composer.Core.GetUpload(c, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handler.sendError(w, r, err)
|
handler.sendError(c, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var info FileInfo
|
var info FileInfo
|
||||||
if handler.config.NotifyTerminatedUploads {
|
if handler.config.NotifyTerminatedUploads {
|
||||||
info, err = upload.GetInfo(ctx)
|
info, err = upload.GetInfo(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handler.sendError(w, r, err)
|
handler.sendError(c, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = handler.terminateUpload(ctx, upload, info, r)
|
err = handler.terminateUpload(c, upload, info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
handler.sendError(w, r, err)
|
handler.sendError(c, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
handler.sendResp(w, r, HTTPResponse{
|
handler.sendResp(c, HTTPResponse{
|
||||||
StatusCode: http.StatusNoContent,
|
StatusCode: http.StatusNoContent,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -855,16 +863,16 @@ func (handler *UnroutedHandler) DelFile(w http.ResponseWriter, r *http.Request)
|
||||||
// and updates the statistics.
|
// and updates the statistics.
|
||||||
// Note the the info argument is only needed if the terminated uploads
|
// Note the the info argument is only needed if the terminated uploads
|
||||||
// notifications are enabled.
|
// notifications are enabled.
|
||||||
func (handler *UnroutedHandler) terminateUpload(ctx context.Context, upload Upload, info FileInfo, r *http.Request) error {
|
func (handler *UnroutedHandler) terminateUpload(c *httpContext, upload Upload, info FileInfo) error {
|
||||||
terminatableUpload := handler.composer.Terminater.AsTerminatableUpload(upload)
|
terminatableUpload := handler.composer.Terminater.AsTerminatableUpload(upload)
|
||||||
|
|
||||||
err := terminatableUpload.Terminate(ctx)
|
err := terminatableUpload.Terminate(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if handler.config.NotifyTerminatedUploads {
|
if handler.config.NotifyTerminatedUploads {
|
||||||
handler.TerminatedUploads <- newHookEvent(info, r)
|
handler.TerminatedUploads <- newHookEvent(info, c.req)
|
||||||
}
|
}
|
||||||
|
|
||||||
handler.Metrics.incUploadsTerminated()
|
handler.Metrics.incUploadsTerminated()
|
||||||
|
@ -874,7 +882,7 @@ func (handler *UnroutedHandler) terminateUpload(ctx context.Context, upload Uplo
|
||||||
|
|
||||||
// Send the error in the response body. The status code will be looked up in
|
// Send the error in the response body. The status code will be looked up in
|
||||||
// ErrStatusCodes. If none is found 500 Internal Error will be used.
|
// ErrStatusCodes. If none is found 500 Internal Error will be used.
|
||||||
func (handler *UnroutedHandler) sendError(w http.ResponseWriter, r *http.Request, err error) {
|
func (handler *UnroutedHandler) sendError(c *httpContext, err error) {
|
||||||
// Errors for read timeouts contain too much information which is not
|
// Errors for read timeouts contain too much information which is not
|
||||||
// necessary for us and makes grouping for the metrics harder. The error
|
// necessary for us and makes grouping for the metrics harder. The error
|
||||||
// message looks like: read tcp 127.0.0.1:1080->127.0.0.1:53673: i/o timeout
|
// message looks like: read tcp 127.0.0.1:1080->127.0.0.1:53673: i/o timeout
|
||||||
|
@ -909,6 +917,8 @@ func (handler *UnroutedHandler) sendError(w http.ResponseWriter, r *http.Request
|
||||||
// err = nil
|
// err = nil
|
||||||
//}
|
//}
|
||||||
|
|
||||||
|
r := c.req
|
||||||
|
|
||||||
detailedErr, ok := err.(Error)
|
detailedErr, ok := err.(Error)
|
||||||
if !ok {
|
if !ok {
|
||||||
handler.log("InternalServerError", "message", err.Error(), "method", r.Method, "path", r.URL.Path, "requestId", getRequestId(r))
|
handler.log("InternalServerError", "message", err.Error(), "method", r.Method, "path", r.URL.Path, "requestId", getRequestId(r))
|
||||||
|
@ -921,15 +931,15 @@ func (handler *UnroutedHandler) sendError(w http.ResponseWriter, r *http.Request
|
||||||
detailedErr.HTTPResponse.Body = ""
|
detailedErr.HTTPResponse.Body = ""
|
||||||
}
|
}
|
||||||
|
|
||||||
handler.sendResp(w, r, detailedErr.HTTPResponse)
|
handler.sendResp(c, detailedErr.HTTPResponse)
|
||||||
handler.Metrics.incErrorsTotal(detailedErr)
|
handler.Metrics.incErrorsTotal(detailedErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// sendResp writes the header to w with the specified status code.
|
// sendResp writes the header to w with the specified status code.
|
||||||
func (handler *UnroutedHandler) sendResp(w http.ResponseWriter, r *http.Request, resp HTTPResponse) {
|
func (handler *UnroutedHandler) sendResp(c *httpContext, resp HTTPResponse) {
|
||||||
resp.writeTo(w)
|
resp.writeTo(c.res)
|
||||||
|
|
||||||
handler.log("ResponseOutgoing", "status", strconv.Itoa(resp.StatusCode), "method", r.Method, "path", r.URL.Path, "requestId", getRequestId(r), "body", resp.Body)
|
handler.log("ResponseOutgoing", "status", strconv.Itoa(resp.StatusCode), "method", c.req.Method, "path", c.req.URL.Path, "requestId", getRequestId(c.req), "body", resp.Body)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make an absolute URLs to the given upload id. If the base path is absolute
|
// Make an absolute URLs to the given upload id. If the base path is absolute
|
||||||
|
@ -1073,13 +1083,23 @@ func (handler *UnroutedHandler) validateNewUploadLengthHeaders(uploadLengthHeade
|
||||||
|
|
||||||
// lockUpload creates a new lock for the given upload ID and attempts to lock it.
|
// lockUpload creates a new lock for the given upload ID and attempts to lock it.
|
||||||
// The created lock is returned if it was aquired successfully.
|
// The created lock is returned if it was aquired successfully.
|
||||||
func (handler *UnroutedHandler) lockUpload(id string) (Lock, error) {
|
func (handler *UnroutedHandler) lockUpload(c *httpContext, id string) (Lock, error) {
|
||||||
lock, err := handler.composer.Locker.NewLock(id)
|
lock, err := handler.composer.Locker.NewLock(id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := lock.Lock(); err != nil {
|
// TODO: Make lock timeout configurable
|
||||||
|
ctx, cancelContext := context.WithTimeout(context.Background(), 3*time.Second)
|
||||||
|
defer cancelContext()
|
||||||
|
releaseLock := func() {
|
||||||
|
if c.body != nil {
|
||||||
|
handler.log("UploadInterrupted", "id", id, "requestId", getRequestId(c.req))
|
||||||
|
c.body.closeWithError(ErrUploadInterrupted)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := lock.Lock(ctx, releaseLock); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
// Package memorylocker provides an in-memory locking mechanism.
|
// Package memorylocker provides an in-memory locking mechanism.
|
||||||
//
|
//
|
||||||
|
// TODO: Update comment
|
||||||
// When multiple processes are attempting to access an upload, whether it be
|
// When multiple processes are attempting to access an upload, whether it be
|
||||||
// by reading or writing, a synchronization mechanism is required to prevent
|
// by reading or writing, a synchronization mechanism is required to prevent
|
||||||
// data corruption, especially to ensure correct offset values and the proper
|
// data corruption, especially to ensure correct offset values and the proper
|
||||||
|
@ -11,6 +12,7 @@
|
||||||
package memorylocker
|
package memorylocker
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/tus/tusd/pkg/handler"
|
"github.com/tus/tusd/pkg/handler"
|
||||||
|
@ -20,14 +22,19 @@ import (
|
||||||
// cheap mechanism. Locks will only exist as long as this object is kept in
|
// cheap mechanism. Locks will only exist as long as this object is kept in
|
||||||
// reference and will be erased if the program exits.
|
// reference and will be erased if the program exits.
|
||||||
type MemoryLocker struct {
|
type MemoryLocker struct {
|
||||||
locks map[string]struct{}
|
locks map[string]lockEntry
|
||||||
mutex sync.Mutex
|
mutex sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
type lockEntry struct {
|
||||||
|
lockReleased chan struct{}
|
||||||
|
requestRelease func()
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a new in-memory locker.
|
// New creates a new in-memory locker.
|
||||||
func New() *MemoryLocker {
|
func New() *MemoryLocker {
|
||||||
return &MemoryLocker{
|
return &MemoryLocker{
|
||||||
locks: make(map[string]struct{}),
|
locks: make(map[string]lockEntry),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -46,16 +53,40 @@ type memoryLock struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Lock tries to obtain the exclusive lock.
|
// Lock tries to obtain the exclusive lock.
|
||||||
func (lock memoryLock) Lock() error {
|
func (lock memoryLock) Lock(ctx context.Context, requestRelease func()) error {
|
||||||
lock.locker.mutex.Lock()
|
lock.locker.mutex.RLock()
|
||||||
defer lock.locker.mutex.Unlock()
|
entry, ok := lock.locker.locks[lock.id]
|
||||||
|
lock.locker.mutex.RUnlock()
|
||||||
|
|
||||||
// Ensure file is not locked
|
requestRelease:
|
||||||
if _, ok := lock.locker.locks[lock.id]; ok {
|
if ok {
|
||||||
return handler.ErrFileLocked
|
// TODO: Make this channel?
|
||||||
|
// TODO: Should we ensure this is only called once?
|
||||||
|
entry.requestRelease()
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return handler.ErrLockTimeout
|
||||||
|
case <-entry.lockReleased:
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
lock.locker.locks[lock.id] = struct{}{}
|
lock.locker.mutex.Lock()
|
||||||
|
// Check that the lock has not already been created in the meantime
|
||||||
|
entry, ok = lock.locker.locks[lock.id]
|
||||||
|
if ok {
|
||||||
|
// Lock has been created in the meantime, so we must wait again until it is free
|
||||||
|
lock.locker.mutex.Unlock()
|
||||||
|
goto requestRelease
|
||||||
|
}
|
||||||
|
|
||||||
|
// No lock exists, so we can create it
|
||||||
|
entry = lockEntry{
|
||||||
|
lockReleased: make(chan struct{}),
|
||||||
|
requestRelease: requestRelease,
|
||||||
|
}
|
||||||
|
|
||||||
|
lock.locker.locks[lock.id] = entry
|
||||||
|
lock.locker.mutex.Unlock()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -64,10 +95,14 @@ func (lock memoryLock) Lock() error {
|
||||||
func (lock memoryLock) Unlock() error {
|
func (lock memoryLock) Unlock() error {
|
||||||
lock.locker.mutex.Lock()
|
lock.locker.mutex.Lock()
|
||||||
|
|
||||||
// Deleting a non-existing key does not end in unexpected errors or panic
|
lockReleased := lock.locker.locks[lock.id].lockReleased
|
||||||
// since this operation results in a no-op
|
|
||||||
|
// Delete the lock entry entirely
|
||||||
delete(lock.locker.locks, lock.id)
|
delete(lock.locker.locks, lock.id)
|
||||||
|
|
||||||
lock.locker.mutex.Unlock()
|
lock.locker.mutex.Unlock()
|
||||||
|
|
||||||
|
close(lockReleased)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,16 +1,17 @@
|
||||||
package memorylocker
|
package memorylocker
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
|
||||||
"github.com/tus/tusd/pkg/handler"
|
"github.com/tus/tusd/pkg/handler"
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ handler.Locker = &MemoryLocker{}
|
var _ handler.Locker = &MemoryLocker{}
|
||||||
|
|
||||||
func TestMemoryLocker(t *testing.T) {
|
func TestMemoryLocker_LockAndUnlock(t *testing.T) {
|
||||||
a := assert.New(t)
|
a := assert.New(t)
|
||||||
|
|
||||||
locker := New()
|
locker := New()
|
||||||
|
@ -18,13 +19,62 @@ func TestMemoryLocker(t *testing.T) {
|
||||||
lock1, err := locker.NewLock("one")
|
lock1, err := locker.NewLock("one")
|
||||||
a.NoError(err)
|
a.NoError(err)
|
||||||
|
|
||||||
a.NoError(lock1.Lock())
|
a.NoError(lock1.Lock(context.Background(), func() {
|
||||||
a.Equal(handler.ErrFileLocked, lock1.Lock())
|
panic("must not be called")
|
||||||
|
}))
|
||||||
|
a.NoError(lock1.Unlock())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMemoryLocker_Timeout(t *testing.T) {
|
||||||
|
a := assert.New(t)
|
||||||
|
|
||||||
|
locker := New()
|
||||||
|
releaseRequestCalled := false
|
||||||
|
|
||||||
|
lock1, err := locker.NewLock("one")
|
||||||
|
a.NoError(err)
|
||||||
|
a.NoError(lock1.Lock(context.Background(), func() {
|
||||||
|
releaseRequestCalled = true
|
||||||
|
// We note that the function has been called, but do not
|
||||||
|
// release the lock
|
||||||
|
}))
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
lock2, err := locker.NewLock("one")
|
lock2, err := locker.NewLock("one")
|
||||||
a.NoError(err)
|
a.NoError(err)
|
||||||
a.Equal(handler.ErrFileLocked, lock2.Lock())
|
err = lock2.Lock(ctx, func() {
|
||||||
|
panic("must not be called")
|
||||||
|
})
|
||||||
|
|
||||||
a.NoError(lock1.Unlock())
|
a.Equal(err, handler.ErrLockTimeout)
|
||||||
a.NoError(lock1.Unlock())
|
a.True(releaseRequestCalled)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMemoryLocker_RequestUnlock(t *testing.T) {
|
||||||
|
a := assert.New(t)
|
||||||
|
|
||||||
|
locker := New()
|
||||||
|
releaseRequestCalled := false
|
||||||
|
|
||||||
|
lock1, err := locker.NewLock("one")
|
||||||
|
a.NoError(err)
|
||||||
|
a.NoError(lock1.Lock(context.Background(), func() {
|
||||||
|
releaseRequestCalled = true
|
||||||
|
<-time.After(10 * time.Millisecond)
|
||||||
|
a.NoError(lock1.Unlock())
|
||||||
|
}))
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
lock2, err := locker.NewLock("one")
|
||||||
|
a.NoError(err)
|
||||||
|
a.NoError(lock2.Lock(ctx, func() {
|
||||||
|
panic("must not be called")
|
||||||
|
}))
|
||||||
|
a.NoError(lock2.Unlock())
|
||||||
|
|
||||||
|
a.True(releaseRequestCalled)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue