core: Pass Upload interfaces to ConcatUploads
This commit is contained in:
parent
6b21772107
commit
b73d634b47
|
@ -116,6 +116,10 @@ func (store FileStore) AsLengthDeclarableUpload(upload handler.Upload) handler.L
|
|||
return upload.(*fileUpload)
|
||||
}
|
||||
|
||||
func (store FileStore) AsConcatableUpload(upload handler.Upload) handler.ConcatableUpload {
|
||||
return upload.(*fileUpload)
|
||||
}
|
||||
|
||||
// binPath returns the path to the file storing the binary data.
|
||||
func (store FileStore) binPath(id string) string {
|
||||
return filepath.Join(store.Path, id)
|
||||
|
@ -175,15 +179,17 @@ func (upload *fileUpload) Terminate(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (store FileStore) ConcatUploads(ctx context.Context, dest string, uploads []string) (err error) {
|
||||
file, err := os.OpenFile(store.binPath(dest), os.O_WRONLY|os.O_APPEND, defaultFilePerm)
|
||||
func (upload *fileUpload) ConcatUploads(ctx context.Context, uploads []handler.Upload) (err error) {
|
||||
file, err := os.OpenFile(upload.binPath, os.O_WRONLY|os.O_APPEND, defaultFilePerm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
for _, id := range uploads {
|
||||
src, err := os.Open(store.binPath(id))
|
||||
for _, partialUpload := range uploads {
|
||||
fileUpload := partialUpload.(*fileUpload)
|
||||
|
||||
src, err := os.Open(fileUpload.binPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -108,7 +108,7 @@ func TestConcatUploads(t *testing.T) {
|
|||
finId := finInfo.ID
|
||||
|
||||
// Create three uploads for concatenating
|
||||
ids := make([]string, 3)
|
||||
partialUploads := make([]handler.Upload, 3)
|
||||
contents := []string{
|
||||
"abc",
|
||||
"def",
|
||||
|
@ -122,13 +122,10 @@ func TestConcatUploads(t *testing.T) {
|
|||
a.NoError(err)
|
||||
a.EqualValues(3, n)
|
||||
|
||||
info, err := upload.GetInfo(ctx)
|
||||
a.NoError(err)
|
||||
|
||||
ids[i] = info.ID
|
||||
partialUploads[i] = upload
|
||||
}
|
||||
|
||||
err = store.ConcatUploads(ctx, finId, ids)
|
||||
err = store.AsConcatableUpload(finUpload).ConcatUploads(ctx, partialUploads)
|
||||
a.NoError(err)
|
||||
|
||||
// Check offset
|
||||
|
|
|
@ -141,7 +141,8 @@ func TestConcat(t *testing.T) {
|
|||
PartialUploads: []string{"a", "b"},
|
||||
MetaData: make(map[string]string),
|
||||
}, nil),
|
||||
store.EXPECT().ConcatUploads(context.Background(), "foo", []string{"a", "b"}).Return(nil),
|
||||
store.EXPECT().AsConcatableUpload(uploadC).Return(uploadC),
|
||||
uploadC.EXPECT().ConcatUploads(context.Background(), []Upload{uploadA, uploadB}).Return(nil),
|
||||
)
|
||||
|
||||
handler, _ := NewHandler(Config{
|
||||
|
|
|
@ -105,13 +105,17 @@ type TerminaterDataStore interface {
|
|||
// Concatenation extension should be enabled. Only in this case, the handler
|
||||
// will parse and respect the Upload-Concat header.
|
||||
type ConcaterDataStore interface {
|
||||
// ConcatUploads concatenations the content from the provided partial uploads
|
||||
// and write the result in the destination upload which is specified by its
|
||||
// ID. The caller (usually the handler) must and will ensure that this
|
||||
AsConcatableUpload(upload Upload) ConcatableUpload
|
||||
}
|
||||
|
||||
type ConcatableUpload interface {
|
||||
// ConcatUploads concatenates the content from the provided partial uploads
|
||||
// and writes the result in the destination upload.
|
||||
// The caller (usually the handler) must and will ensure that this
|
||||
// destination upload has been created before with enough space to hold all
|
||||
// partial uploads. The order, in which the partial uploads are supplied,
|
||||
// must be respected during concatenation.
|
||||
ConcatUploads(ctx context.Context, destination string, partialUploads []string) error
|
||||
ConcatUploads(ctx context.Context, partialUploads []Upload) error
|
||||
}
|
||||
|
||||
// LengthDeferrerDataStore is the interface that must be implemented if the
|
||||
|
|
|
@ -79,18 +79,18 @@ func (mr *MockFullDataStoreMockRecorder) AsTerminatableUpload(upload interface{}
|
|||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AsTerminatableUpload", reflect.TypeOf((*MockFullDataStore)(nil).AsTerminatableUpload), upload)
|
||||
}
|
||||
|
||||
// ConcatUploads mocks base method
|
||||
func (m *MockFullDataStore) ConcatUploads(ctx context.Context, destination string, partialUploads []string) error {
|
||||
// AsConcatableUpload mocks base method
|
||||
func (m *MockFullDataStore) AsConcatableUpload(upload handler.Upload) handler.ConcatableUpload {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "ConcatUploads", ctx, destination, partialUploads)
|
||||
ret0, _ := ret[0].(error)
|
||||
ret := m.ctrl.Call(m, "AsConcatableUpload", upload)
|
||||
ret0, _ := ret[0].(handler.ConcatableUpload)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// ConcatUploads indicates an expected call of ConcatUploads
|
||||
func (mr *MockFullDataStoreMockRecorder) ConcatUploads(ctx, destination, partialUploads interface{}) *gomock.Call {
|
||||
// AsConcatableUpload indicates an expected call of AsConcatableUpload
|
||||
func (mr *MockFullDataStoreMockRecorder) AsConcatableUpload(upload interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ConcatUploads", reflect.TypeOf((*MockFullDataStore)(nil).ConcatUploads), ctx, destination, partialUploads)
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AsConcatableUpload", reflect.TypeOf((*MockFullDataStore)(nil).AsConcatableUpload), upload)
|
||||
}
|
||||
|
||||
// AsLengthDeclarableUpload mocks base method
|
||||
|
@ -217,6 +217,20 @@ func (mr *MockFullUploadMockRecorder) DeclareLength(ctx, length interface{}) *go
|
|||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeclareLength", reflect.TypeOf((*MockFullUpload)(nil).DeclareLength), ctx, length)
|
||||
}
|
||||
|
||||
// ConcatUploads mocks base method
|
||||
func (m *MockFullUpload) ConcatUploads(ctx context.Context, partialUploads []handler.Upload) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "ConcatUploads", ctx, partialUploads)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// ConcatUploads indicates an expected call of ConcatUploads
|
||||
func (mr *MockFullUploadMockRecorder) ConcatUploads(ctx, partialUploads interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ConcatUploads", reflect.TypeOf((*MockFullUpload)(nil).ConcatUploads), ctx, partialUploads)
|
||||
}
|
||||
|
||||
// MockFullLocker is a mock of FullLocker interface
|
||||
type MockFullLocker struct {
|
||||
ctrl *gomock.Controller
|
||||
|
|
|
@ -289,7 +289,7 @@ func (handler *UnroutedHandler) PostFile(w http.ResponseWriter, r *http.Request)
|
|||
}
|
||||
|
||||
// Parse Upload-Concat header
|
||||
isPartial, isFinal, partialUploads, err := parseConcat(concatHeader)
|
||||
isPartial, isFinal, partialUploadIDs, err := parseConcat(concatHeader)
|
||||
if err != nil {
|
||||
handler.sendError(w, r, err)
|
||||
return
|
||||
|
@ -300,6 +300,7 @@ func (handler *UnroutedHandler) PostFile(w http.ResponseWriter, r *http.Request)
|
|||
// Upload-Length header)
|
||||
var size int64
|
||||
var sizeIsDeferred bool
|
||||
var partialUploads []Upload
|
||||
if isFinal {
|
||||
// A final upload must not contain a chunk within the creation request
|
||||
if containsChunk {
|
||||
|
@ -307,7 +308,7 @@ func (handler *UnroutedHandler) PostFile(w http.ResponseWriter, r *http.Request)
|
|||
return
|
||||
}
|
||||
|
||||
size, err = handler.sizeOfUploads(ctx, partialUploads)
|
||||
partialUploads, size, err = handler.sizeOfUploads(ctx, partialUploadIDs)
|
||||
if err != nil {
|
||||
handler.sendError(w, r, err)
|
||||
return
|
||||
|
@ -337,7 +338,7 @@ func (handler *UnroutedHandler) PostFile(w http.ResponseWriter, r *http.Request)
|
|||
MetaData: meta,
|
||||
IsPartial: isPartial,
|
||||
IsFinal: isFinal,
|
||||
PartialUploads: partialUploads,
|
||||
PartialUploads: partialUploadIDs,
|
||||
}
|
||||
|
||||
if handler.config.PreUploadCreateCallback != nil {
|
||||
|
@ -374,7 +375,8 @@ func (handler *UnroutedHandler) PostFile(w http.ResponseWriter, r *http.Request)
|
|||
}
|
||||
|
||||
if isFinal {
|
||||
if err := handler.composer.Concater.ConcatUploads(ctx, id, partialUploads); err != nil {
|
||||
concatableUpload := handler.composer.Concater.AsConcatableUpload(upload)
|
||||
if err := concatableUpload.ConcatUploads(ctx, partialUploads); err != nil {
|
||||
handler.sendError(w, r, err)
|
||||
return
|
||||
}
|
||||
|
@ -1034,24 +1036,27 @@ func getHostAndProtocol(r *http.Request, allowForwarded bool) (host, proto strin
|
|||
// The get sum of all sizes for a list of upload ids while checking whether
|
||||
// all of these uploads are finished yet. This is used to calculate the size
|
||||
// of a final resource.
|
||||
func (handler *UnroutedHandler) sizeOfUploads(ctx context.Context, ids []string) (size int64, err error) {
|
||||
for _, id := range ids {
|
||||
func (handler *UnroutedHandler) sizeOfUploads(ctx context.Context, ids []string) (partialUploads []Upload, size int64, err error) {
|
||||
partialUploads = make([]Upload, len(ids))
|
||||
|
||||
for i, id := range ids {
|
||||
upload, err := handler.composer.Core.GetUpload(ctx, id)
|
||||
if err != nil {
|
||||
return size, err
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
info, err := upload.GetInfo(ctx)
|
||||
if err != nil {
|
||||
return size, err
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
if info.SizeIsDeferred || info.Offset != info.Size {
|
||||
err = ErrUploadNotFinished
|
||||
return size, err
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
size += info.Size
|
||||
partialUploads[i] = upload
|
||||
}
|
||||
|
||||
return
|
||||
|
|
|
@ -32,6 +32,7 @@ type FullUpload interface {
|
|||
handler.Upload
|
||||
handler.TerminatableUpload
|
||||
handler.LengthDeclarableUpload
|
||||
handler.ConcatableUpload
|
||||
}
|
||||
|
||||
type FullLocker interface {
|
||||
|
|
|
@ -241,6 +241,10 @@ func (store S3Store) AsLengthDeclarableUpload(upload handler.Upload) handler.Len
|
|||
return upload.(*s3Upload)
|
||||
}
|
||||
|
||||
func (store S3Store) AsConcatableUpload(upload handler.Upload) handler.ConcatableUpload {
|
||||
return upload.(*s3Upload)
|
||||
}
|
||||
|
||||
func (upload *s3Upload) writeInfo(ctx context.Context, info handler.FileInfo) error {
|
||||
id := upload.id
|
||||
store := upload.store
|
||||
|
@ -582,8 +586,10 @@ func (upload s3Upload) FinishUpload(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (store S3Store) ConcatUploads(ctx context.Context, dest string, partialUploads []string) error {
|
||||
uploadId, multipartId := splitIds(dest)
|
||||
func (upload *s3Upload) ConcatUploads(ctx context.Context, partialUploads []handler.Upload) error {
|
||||
id := upload.id
|
||||
store := upload.store
|
||||
uploadId, multipartId := splitIds(id)
|
||||
|
||||
numPartialUploads := len(partialUploads)
|
||||
errs := make([]error, 0, numPartialUploads)
|
||||
|
@ -591,12 +597,13 @@ func (store S3Store) ConcatUploads(ctx context.Context, dest string, partialUplo
|
|||
// Copy partial uploads concurrently
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(numPartialUploads)
|
||||
for i, partialId := range partialUploads {
|
||||
for i, partialUpload := range partialUploads {
|
||||
partialS3Upload := partialUpload.(*s3Upload)
|
||||
partialId, _ := splitIds(partialS3Upload.id)
|
||||
|
||||
go func(i int, partialId string) {
|
||||
defer wg.Done()
|
||||
|
||||
partialUploadId, _ := splitIds(partialId)
|
||||
|
||||
_, err := store.Service.UploadPartCopyWithContext(ctx, &s3.UploadPartCopyInput{
|
||||
Bucket: aws.String(store.Bucket),
|
||||
Key: store.keyWithPrefix(uploadId),
|
||||
|
@ -604,7 +611,7 @@ func (store S3Store) ConcatUploads(ctx context.Context, dest string, partialUplo
|
|||
// Part numbers must be in the range of 1 to 10000, inclusive. Since
|
||||
// slice indexes start at 0, we add 1 to ensure that i >= 1.
|
||||
PartNumber: aws.Int64(int64(i + 1)),
|
||||
CopySource: aws.String(store.Bucket + "/" + partialUploadId),
|
||||
CopySource: aws.String(store.Bucket + "/" + partialId),
|
||||
})
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
|
@ -619,10 +626,6 @@ func (store S3Store) ConcatUploads(ctx context.Context, dest string, partialUplo
|
|||
return newMultiError(errs)
|
||||
}
|
||||
|
||||
upload, err := store.GetUpload(ctx, dest)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return upload.FinishUpload(ctx)
|
||||
}
|
||||
|
||||
|
|
|
@ -1125,10 +1125,20 @@ func TestConcatUploads(t *testing.T) {
|
|||
}).Return(nil, nil),
|
||||
)
|
||||
|
||||
err := store.ConcatUploads(context.Background(), "uploadId+multipartId", []string{
|
||||
"aaa+AAA",
|
||||
"bbb+BBB",
|
||||
"ccc+CCC",
|
||||
upload, err := store.GetUpload(context.Background(), "uploadId+multipartId")
|
||||
assert.Nil(err)
|
||||
|
||||
uploadA, err := store.GetUpload(context.Background(), "aaa+AAA")
|
||||
assert.Nil(err)
|
||||
uploadB, err := store.GetUpload(context.Background(), "bbb+BBB")
|
||||
assert.Nil(err)
|
||||
uploadC, err := store.GetUpload(context.Background(), "ccc+CCC")
|
||||
assert.Nil(err)
|
||||
|
||||
err = store.AsConcatableUpload(upload).ConcatUploads(context.Background(), []handler.Upload{
|
||||
uploadA,
|
||||
uploadB,
|
||||
uploadC,
|
||||
})
|
||||
assert.Nil(err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue