From bbf706622c7c5234844fbf2b8c5efdbcda1e3861 Mon Sep 17 00:00:00 2001 From: Marius Date: Wed, 3 Feb 2016 21:18:21 +0100 Subject: [PATCH] Implement Concatentation extension for S3Store --- s3store/multi_error.go | 13 +++++++ s3store/s3store.go | 48 ++++++++++++++++++++--- s3store/s3store_test.go | 86 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 141 insertions(+), 6 deletions(-) create mode 100644 s3store/multi_error.go diff --git a/s3store/multi_error.go b/s3store/multi_error.go new file mode 100644 index 0000000..7bdb830 --- /dev/null +++ b/s3store/multi_error.go @@ -0,0 +1,13 @@ +package s3store + +import ( + "errors" +) + +func newMultiError(errs []error) error { + message := "Multiple errors occured:\n" + for _, err := range errs { + message += "\t" + err.Error() + "\n" + } + return errors.New(message) +} diff --git a/s3store/s3store.go b/s3store/s3store.go index 357671d..a26b7ce 100644 --- a/s3store/s3store.go +++ b/s3store/s3store.go @@ -66,7 +66,7 @@ // ensure that the server running this storage backend has enough disk space // available to hold these caches. // -// In addition, it must be mentioned that AWS S3 only offeres eventual +// In addition, it must be mentioned that AWS S3 only offers eventual // consistency (https://aws.amazon.com/s3/faqs/#What_data_consistency_model_does_Amazon_S3_employ). // Therefore, it is required to build additional measurements in order to // prevent concurrent access to the same upload resources which may result in @@ -395,11 +395,7 @@ func (store S3Store) Terminate(id string) error { wg.Wait() if len(errs) > 0 { - message := "Multiple errors occured:\n" - for _, err := range errs { - message += "\t" + err.Error() + "\n" - } - return errors.New(message) + return newMultiError(errs) } return nil @@ -442,6 +438,46 @@ func (store S3Store) FinishUpload(id string) error { return err } +func (store S3Store) ConcatUploads(dest string, partialUploads []string) error { + uploadId, multipartId := splitIds(dest) + + numPartialUploads := len(partialUploads) + errs := make([]error, 0, numPartialUploads) + + // Copy partial uploads concurrently + var wg sync.WaitGroup + wg.Add(numPartialUploads) + for i, partialId := range partialUploads { + go func(i int, partialId string) { + defer wg.Done() + + partialUploadId, _ := splitIds(partialId) + + _, err := store.Service.UploadPartCopy(&s3.UploadPartCopyInput{ + Bucket: aws.String(store.Bucket), + Key: aws.String(uploadId), + UploadId: aws.String(multipartId), + // Part numbers must be in the range of 1 to 10000, inclusive. Since + // slice indexes start at 0, we add 1 to ensure that i >= 1. + PartNumber: aws.Int64(int64(i + 1)), + CopySource: aws.String(store.Bucket + "/" + partialUploadId), + }) + if err != nil { + errs = append(errs, err) + return + } + }(i, partialId) + } + + wg.Wait() + + if len(errs) > 0 { + return newMultiError(errs) + } + + return store.FinishUpload(dest) +} + func splitIds(id string) (uploadId, multipartId string) { index := strings.Index(id, "+") if index == -1 { diff --git a/s3store/s3store_test.go b/s3store/s3store_test.go index 873f14d..adead7f 100644 --- a/s3store/s3store_test.go +++ b/s3store/s3store_test.go @@ -22,6 +22,7 @@ var _ tusd.DataStore = s3store.S3Store{} var _ tusd.GetReaderDataStore = s3store.S3Store{} var _ tusd.TerminaterDataStore = s3store.S3Store{} var _ tusd.FinisherDataStore = s3store.S3Store{} +var _ tusd.ConcaterDataStore = s3store.S3Store{} func TestNewUpload(t *testing.T) { mockCtrl := gomock.NewController(t) @@ -536,3 +537,88 @@ func TestTerminateWithErrors(t *testing.T) { err := store.Terminate("uploadId+multipartId") assert.Equal("Multiple errors occured:\n\tAWS S3 Error (hello) for object uploadId: it's me.\n", err.Error()) } + +func TestConcatUploads(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + assert := assert.New(t) + + s3obj := NewMockS3API(mockCtrl) + store := s3store.New("bucket", s3obj) + + s3obj.EXPECT().UploadPartCopy(&s3.UploadPartCopyInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + CopySource: aws.String("bucket/aaa"), + PartNumber: aws.Int64(1), + }).Return(nil, nil) + + s3obj.EXPECT().UploadPartCopy(&s3.UploadPartCopyInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + CopySource: aws.String("bucket/bbb"), + PartNumber: aws.Int64(2), + }).Return(nil, nil) + + s3obj.EXPECT().UploadPartCopy(&s3.UploadPartCopyInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + CopySource: aws.String("bucket/ccc"), + PartNumber: aws.Int64(3), + }).Return(nil, nil) + + // Output from s3Store.FinishUpload + gomock.InOrder( + s3obj.EXPECT().ListParts(&s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + }).Return(&s3.ListPartsOutput{ + Parts: []*s3.Part{ + { + ETag: aws.String("foo"), + PartNumber: aws.Int64(1), + }, + { + ETag: aws.String("bar"), + PartNumber: aws.Int64(2), + }, + { + ETag: aws.String("baz"), + PartNumber: aws.Int64(3), + }, + }, + }, nil), + s3obj.EXPECT().CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + MultipartUpload: &s3.CompletedMultipartUpload{ + Parts: []*s3.CompletedPart{ + { + ETag: aws.String("foo"), + PartNumber: aws.Int64(1), + }, + { + ETag: aws.String("bar"), + PartNumber: aws.Int64(2), + }, + { + ETag: aws.String("baz"), + PartNumber: aws.Int64(3), + }, + }, + }, + }).Return(nil, nil), + ) + + err := store.ConcatUploads("uploadId+multipartId", []string{ + "aaa+AAA", + "bbb+BBB", + "ccc+CCC", + }) + assert.Nil(err) +}