From b73d634b47a851022a8c3a9a6bb8c9dc6aa3df65 Mon Sep 17 00:00:00 2001 From: Marius Date: Thu, 19 Sep 2019 12:14:25 +0200 Subject: [PATCH] core: Pass Upload interfaces to ConcatUploads --- pkg/filestore/filestore.go | 14 ++++++++++---- pkg/filestore/filestore_test.go | 9 +++------ pkg/handler/concat_test.go | 3 ++- pkg/handler/datastore.go | 12 ++++++++---- pkg/handler/handler_mock_test.go | 28 +++++++++++++++++++++------- pkg/handler/unrouted_handler.go | 23 ++++++++++++++--------- pkg/handler/utils_test.go | 1 + pkg/s3store/s3store.go | 23 +++++++++++++---------- pkg/s3store/s3store_test.go | 18 ++++++++++++++---- 9 files changed, 86 insertions(+), 45 deletions(-) diff --git a/pkg/filestore/filestore.go b/pkg/filestore/filestore.go index 2b7a15f..24a186b 100644 --- a/pkg/filestore/filestore.go +++ b/pkg/filestore/filestore.go @@ -116,6 +116,10 @@ func (store FileStore) AsLengthDeclarableUpload(upload handler.Upload) handler.L return upload.(*fileUpload) } +func (store FileStore) AsConcatableUpload(upload handler.Upload) handler.ConcatableUpload { + return upload.(*fileUpload) +} + // binPath returns the path to the file storing the binary data. func (store FileStore) binPath(id string) string { return filepath.Join(store.Path, id) @@ -175,15 +179,17 @@ func (upload *fileUpload) Terminate(ctx context.Context) error { return nil } -func (store FileStore) ConcatUploads(ctx context.Context, dest string, uploads []string) (err error) { - file, err := os.OpenFile(store.binPath(dest), os.O_WRONLY|os.O_APPEND, defaultFilePerm) +func (upload *fileUpload) ConcatUploads(ctx context.Context, uploads []handler.Upload) (err error) { + file, err := os.OpenFile(upload.binPath, os.O_WRONLY|os.O_APPEND, defaultFilePerm) if err != nil { return err } defer file.Close() - for _, id := range uploads { - src, err := os.Open(store.binPath(id)) + for _, partialUpload := range uploads { + fileUpload := partialUpload.(*fileUpload) + + src, err := os.Open(fileUpload.binPath) if err != nil { return err } diff --git a/pkg/filestore/filestore_test.go b/pkg/filestore/filestore_test.go index 2e698bc..6bbb4ee 100644 --- a/pkg/filestore/filestore_test.go +++ b/pkg/filestore/filestore_test.go @@ -108,7 +108,7 @@ func TestConcatUploads(t *testing.T) { finId := finInfo.ID // Create three uploads for concatenating - ids := make([]string, 3) + partialUploads := make([]handler.Upload, 3) contents := []string{ "abc", "def", @@ -122,13 +122,10 @@ func TestConcatUploads(t *testing.T) { a.NoError(err) a.EqualValues(3, n) - info, err := upload.GetInfo(ctx) - a.NoError(err) - - ids[i] = info.ID + partialUploads[i] = upload } - err = store.ConcatUploads(ctx, finId, ids) + err = store.AsConcatableUpload(finUpload).ConcatUploads(ctx, partialUploads) a.NoError(err) // Check offset diff --git a/pkg/handler/concat_test.go b/pkg/handler/concat_test.go index f62278d..98bca7e 100644 --- a/pkg/handler/concat_test.go +++ b/pkg/handler/concat_test.go @@ -141,7 +141,8 @@ func TestConcat(t *testing.T) { PartialUploads: []string{"a", "b"}, MetaData: make(map[string]string), }, nil), - store.EXPECT().ConcatUploads(context.Background(), "foo", []string{"a", "b"}).Return(nil), + store.EXPECT().AsConcatableUpload(uploadC).Return(uploadC), + uploadC.EXPECT().ConcatUploads(context.Background(), []Upload{uploadA, uploadB}).Return(nil), ) handler, _ := NewHandler(Config{ diff --git a/pkg/handler/datastore.go b/pkg/handler/datastore.go index 7b1cda2..1eb4240 100644 --- a/pkg/handler/datastore.go +++ b/pkg/handler/datastore.go @@ -105,13 +105,17 @@ type TerminaterDataStore interface { // Concatenation extension should be enabled. Only in this case, the handler // will parse and respect the Upload-Concat header. type ConcaterDataStore interface { - // ConcatUploads concatenations the content from the provided partial uploads - // and write the result in the destination upload which is specified by its - // ID. The caller (usually the handler) must and will ensure that this + AsConcatableUpload(upload Upload) ConcatableUpload +} + +type ConcatableUpload interface { + // ConcatUploads concatenates the content from the provided partial uploads + // and writes the result in the destination upload. + // The caller (usually the handler) must and will ensure that this // destination upload has been created before with enough space to hold all // partial uploads. The order, in which the partial uploads are supplied, // must be respected during concatenation. - ConcatUploads(ctx context.Context, destination string, partialUploads []string) error + ConcatUploads(ctx context.Context, partialUploads []Upload) error } // LengthDeferrerDataStore is the interface that must be implemented if the diff --git a/pkg/handler/handler_mock_test.go b/pkg/handler/handler_mock_test.go index c8ac5a3..062c7fa 100644 --- a/pkg/handler/handler_mock_test.go +++ b/pkg/handler/handler_mock_test.go @@ -79,18 +79,18 @@ func (mr *MockFullDataStoreMockRecorder) AsTerminatableUpload(upload interface{} return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AsTerminatableUpload", reflect.TypeOf((*MockFullDataStore)(nil).AsTerminatableUpload), upload) } -// ConcatUploads mocks base method -func (m *MockFullDataStore) ConcatUploads(ctx context.Context, destination string, partialUploads []string) error { +// AsConcatableUpload mocks base method +func (m *MockFullDataStore) AsConcatableUpload(upload handler.Upload) handler.ConcatableUpload { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ConcatUploads", ctx, destination, partialUploads) - ret0, _ := ret[0].(error) + ret := m.ctrl.Call(m, "AsConcatableUpload", upload) + ret0, _ := ret[0].(handler.ConcatableUpload) return ret0 } -// ConcatUploads indicates an expected call of ConcatUploads -func (mr *MockFullDataStoreMockRecorder) ConcatUploads(ctx, destination, partialUploads interface{}) *gomock.Call { +// AsConcatableUpload indicates an expected call of AsConcatableUpload +func (mr *MockFullDataStoreMockRecorder) AsConcatableUpload(upload interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ConcatUploads", reflect.TypeOf((*MockFullDataStore)(nil).ConcatUploads), ctx, destination, partialUploads) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AsConcatableUpload", reflect.TypeOf((*MockFullDataStore)(nil).AsConcatableUpload), upload) } // AsLengthDeclarableUpload mocks base method @@ -217,6 +217,20 @@ func (mr *MockFullUploadMockRecorder) DeclareLength(ctx, length interface{}) *go return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeclareLength", reflect.TypeOf((*MockFullUpload)(nil).DeclareLength), ctx, length) } +// ConcatUploads mocks base method +func (m *MockFullUpload) ConcatUploads(ctx context.Context, partialUploads []handler.Upload) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ConcatUploads", ctx, partialUploads) + ret0, _ := ret[0].(error) + return ret0 +} + +// ConcatUploads indicates an expected call of ConcatUploads +func (mr *MockFullUploadMockRecorder) ConcatUploads(ctx, partialUploads interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ConcatUploads", reflect.TypeOf((*MockFullUpload)(nil).ConcatUploads), ctx, partialUploads) +} + // MockFullLocker is a mock of FullLocker interface type MockFullLocker struct { ctrl *gomock.Controller diff --git a/pkg/handler/unrouted_handler.go b/pkg/handler/unrouted_handler.go index d72356f..fdf5e10 100644 --- a/pkg/handler/unrouted_handler.go +++ b/pkg/handler/unrouted_handler.go @@ -289,7 +289,7 @@ func (handler *UnroutedHandler) PostFile(w http.ResponseWriter, r *http.Request) } // Parse Upload-Concat header - isPartial, isFinal, partialUploads, err := parseConcat(concatHeader) + isPartial, isFinal, partialUploadIDs, err := parseConcat(concatHeader) if err != nil { handler.sendError(w, r, err) return @@ -300,6 +300,7 @@ func (handler *UnroutedHandler) PostFile(w http.ResponseWriter, r *http.Request) // Upload-Length header) var size int64 var sizeIsDeferred bool + var partialUploads []Upload if isFinal { // A final upload must not contain a chunk within the creation request if containsChunk { @@ -307,7 +308,7 @@ func (handler *UnroutedHandler) PostFile(w http.ResponseWriter, r *http.Request) return } - size, err = handler.sizeOfUploads(ctx, partialUploads) + partialUploads, size, err = handler.sizeOfUploads(ctx, partialUploadIDs) if err != nil { handler.sendError(w, r, err) return @@ -337,7 +338,7 @@ func (handler *UnroutedHandler) PostFile(w http.ResponseWriter, r *http.Request) MetaData: meta, IsPartial: isPartial, IsFinal: isFinal, - PartialUploads: partialUploads, + PartialUploads: partialUploadIDs, } if handler.config.PreUploadCreateCallback != nil { @@ -374,7 +375,8 @@ func (handler *UnroutedHandler) PostFile(w http.ResponseWriter, r *http.Request) } if isFinal { - if err := handler.composer.Concater.ConcatUploads(ctx, id, partialUploads); err != nil { + concatableUpload := handler.composer.Concater.AsConcatableUpload(upload) + if err := concatableUpload.ConcatUploads(ctx, partialUploads); err != nil { handler.sendError(w, r, err) return } @@ -1034,24 +1036,27 @@ func getHostAndProtocol(r *http.Request, allowForwarded bool) (host, proto strin // The get sum of all sizes for a list of upload ids while checking whether // all of these uploads are finished yet. This is used to calculate the size // of a final resource. -func (handler *UnroutedHandler) sizeOfUploads(ctx context.Context, ids []string) (size int64, err error) { - for _, id := range ids { +func (handler *UnroutedHandler) sizeOfUploads(ctx context.Context, ids []string) (partialUploads []Upload, size int64, err error) { + partialUploads = make([]Upload, len(ids)) + + for i, id := range ids { upload, err := handler.composer.Core.GetUpload(ctx, id) if err != nil { - return size, err + return nil, 0, err } info, err := upload.GetInfo(ctx) if err != nil { - return size, err + return nil, 0, err } if info.SizeIsDeferred || info.Offset != info.Size { err = ErrUploadNotFinished - return size, err + return nil, 0, err } size += info.Size + partialUploads[i] = upload } return diff --git a/pkg/handler/utils_test.go b/pkg/handler/utils_test.go index ef298ff..46da576 100644 --- a/pkg/handler/utils_test.go +++ b/pkg/handler/utils_test.go @@ -32,6 +32,7 @@ type FullUpload interface { handler.Upload handler.TerminatableUpload handler.LengthDeclarableUpload + handler.ConcatableUpload } type FullLocker interface { diff --git a/pkg/s3store/s3store.go b/pkg/s3store/s3store.go index ad5d44f..d765de4 100644 --- a/pkg/s3store/s3store.go +++ b/pkg/s3store/s3store.go @@ -241,6 +241,10 @@ func (store S3Store) AsLengthDeclarableUpload(upload handler.Upload) handler.Len return upload.(*s3Upload) } +func (store S3Store) AsConcatableUpload(upload handler.Upload) handler.ConcatableUpload { + return upload.(*s3Upload) +} + func (upload *s3Upload) writeInfo(ctx context.Context, info handler.FileInfo) error { id := upload.id store := upload.store @@ -582,8 +586,10 @@ func (upload s3Upload) FinishUpload(ctx context.Context) error { return err } -func (store S3Store) ConcatUploads(ctx context.Context, dest string, partialUploads []string) error { - uploadId, multipartId := splitIds(dest) +func (upload *s3Upload) ConcatUploads(ctx context.Context, partialUploads []handler.Upload) error { + id := upload.id + store := upload.store + uploadId, multipartId := splitIds(id) numPartialUploads := len(partialUploads) errs := make([]error, 0, numPartialUploads) @@ -591,12 +597,13 @@ func (store S3Store) ConcatUploads(ctx context.Context, dest string, partialUplo // Copy partial uploads concurrently var wg sync.WaitGroup wg.Add(numPartialUploads) - for i, partialId := range partialUploads { + for i, partialUpload := range partialUploads { + partialS3Upload := partialUpload.(*s3Upload) + partialId, _ := splitIds(partialS3Upload.id) + go func(i int, partialId string) { defer wg.Done() - partialUploadId, _ := splitIds(partialId) - _, err := store.Service.UploadPartCopyWithContext(ctx, &s3.UploadPartCopyInput{ Bucket: aws.String(store.Bucket), Key: store.keyWithPrefix(uploadId), @@ -604,7 +611,7 @@ func (store S3Store) ConcatUploads(ctx context.Context, dest string, partialUplo // Part numbers must be in the range of 1 to 10000, inclusive. Since // slice indexes start at 0, we add 1 to ensure that i >= 1. PartNumber: aws.Int64(int64(i + 1)), - CopySource: aws.String(store.Bucket + "/" + partialUploadId), + CopySource: aws.String(store.Bucket + "/" + partialId), }) if err != nil { errs = append(errs, err) @@ -619,10 +626,6 @@ func (store S3Store) ConcatUploads(ctx context.Context, dest string, partialUplo return newMultiError(errs) } - upload, err := store.GetUpload(ctx, dest) - if err != nil { - return err - } return upload.FinishUpload(ctx) } diff --git a/pkg/s3store/s3store_test.go b/pkg/s3store/s3store_test.go index 209dba1..49c3e0e 100644 --- a/pkg/s3store/s3store_test.go +++ b/pkg/s3store/s3store_test.go @@ -1125,10 +1125,20 @@ func TestConcatUploads(t *testing.T) { }).Return(nil, nil), ) - err := store.ConcatUploads(context.Background(), "uploadId+multipartId", []string{ - "aaa+AAA", - "bbb+BBB", - "ccc+CCC", + upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") + assert.Nil(err) + + uploadA, err := store.GetUpload(context.Background(), "aaa+AAA") + assert.Nil(err) + uploadB, err := store.GetUpload(context.Background(), "bbb+BBB") + assert.Nil(err) + uploadC, err := store.GetUpload(context.Background(), "ccc+CCC") + assert.Nil(err) + + err = store.AsConcatableUpload(upload).ConcatUploads(context.Background(), []handler.Upload{ + uploadA, + uploadB, + uploadC, }) assert.Nil(err) }