diff --git a/pkg/s3store/s3store.go b/pkg/s3store/s3store.go index 02824ca..d3f7224 100644 --- a/pkg/s3store/s3store.go +++ b/pkg/s3store/s3store.go @@ -614,6 +614,91 @@ func (upload s3Upload) FinishUpload(ctx context.Context) error { } func (upload *s3Upload) ConcatUploads(ctx context.Context, partialUploads []handler.Upload) error { + hasSmallPart := false + for _, partialUpload := range partialUploads { + info, err := partialUpload.GetInfo(ctx) + if err != nil { + return err + } + + if info.Size < upload.store.MinPartSize { + hasSmallPart = true + } + } + + // If one partial upload is smaller than the the minimum part size for an S3 + // Multipart Upload, we cannot use S3 Multipart Uploads for concatenating all + // the files. + // So instead we have to download them and concat them on disk. + if hasSmallPart { + return upload.concatUsingDownload(ctx, partialUploads) + } else { + return upload.concatUsingMultipart(ctx, partialUploads) + } +} + +func (upload *s3Upload) concatUsingDownload(ctx context.Context, partialUploads []handler.Upload) error { + id := upload.id + store := upload.store + uploadId, multipartId := splitIds(id) + + // Create a temporary file for holding the concatenated data + file, err := ioutil.TempFile("", "tusd-s3-concat-tmp-") + if err != nil { + return err + } + fmt.Println(file.Name()) + defer os.Remove(file.Name()) + defer file.Close() + + // Download each part and append it to the temporary file + for _, partialUpload := range partialUploads { + partialS3Upload := partialUpload.(*s3Upload) + partialId, _ := splitIds(partialS3Upload.id) + + res, err := store.Service.GetObjectWithContext(ctx, &s3.GetObjectInput{ + Bucket: aws.String(store.Bucket), + Key: store.keyWithPrefix(partialId), + }) + if err != nil { + return err + } + defer res.Body.Close() + + if _, err := io.Copy(file, res.Body); err != nil { + return err + } + } + + // Seek to the beginning of the file, so the entire file is being uploaded + file.Seek(0, 0) + + // Upload the entire file to S3 + _, err = store.Service.PutObjectWithContext(ctx, &s3.PutObjectInput{ + Bucket: aws.String(store.Bucket), + Key: store.keyWithPrefix(uploadId), + Body: file, + }) + if err != nil { + return err + } + + // Finally, abort the multipart upload since it will no longer be used. + // This happens asynchronously since we do not need to wait for the result. + // Also, the error is ignored on purpose as it does not change the outcome of + // the request. + go func() { + store.Service.AbortMultipartUploadWithContext(ctx, &s3.AbortMultipartUploadInput{ + Bucket: aws.String(store.Bucket), + Key: store.keyWithPrefix(uploadId), + UploadId: aws.String(multipartId), + }) + }() + + return nil +} + +func (upload *s3Upload) concatUsingMultipart(ctx context.Context, partialUploads []handler.Upload) error { id := upload.id store := upload.store uploadId, multipartId := splitIds(id) diff --git a/pkg/s3store/s3store_test.go b/pkg/s3store/s3store_test.go index 2329df2..210d00c 100644 --- a/pkg/s3store/s3store_test.go +++ b/pkg/s3store/s3store_test.go @@ -7,6 +7,7 @@ import ( "io" "io/ioutil" "testing" + "time" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" @@ -1228,13 +1229,14 @@ func TestTerminateWithErrors(t *testing.T) { assert.Equal("Multiple errors occurred:\n\tAWS S3 Error (hello) for object uploadId: it's me.\n", err.Error()) } -func TestConcatUploads(t *testing.T) { +func TestConcatUploadsUsingMultipart(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() assert := assert.New(t) s3obj := NewMockS3API(mockCtrl) store := New("bucket", s3obj) + store.MinPartSize = 100 s3obj.EXPECT().UploadPartCopyWithContext(context.Background(), &s3.UploadPartCopyInput{ Bucket: aws.String("bucket"), @@ -1316,6 +1318,11 @@ func TestConcatUploads(t *testing.T) { uploadC, err := store.GetUpload(context.Background(), "ccc+CCC") assert.Nil(err) + // All uploads have a size larger than the MinPartSize, so a S3 Multipart Upload is used for concatenation. + uploadA.(*s3Upload).info = &handler.FileInfo{Size: 500} + uploadB.(*s3Upload).info = &handler.FileInfo{Size: 500} + uploadC.(*s3Upload).info = &handler.FileInfo{Size: 500} + err = store.AsConcatableUpload(upload).ConcatUploads(context.Background(), []handler.Upload{ uploadA, uploadB, @@ -1323,3 +1330,69 @@ func TestConcatUploads(t *testing.T) { }) assert.Nil(err) } + +func TestConcatUploadsUsingDownload(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + assert := assert.New(t) + + s3obj := NewMockS3API(mockCtrl) + store := New("bucket", s3obj) + store.MinPartSize = 100 + + gomock.InOrder( + s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("aaa"), + }).Return(&s3.GetObjectOutput{ + Body: ioutil.NopCloser(bytes.NewReader([]byte("aaa"))), + }, nil), + s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("bbb"), + }).Return(&s3.GetObjectOutput{ + Body: ioutil.NopCloser(bytes.NewReader([]byte("bbbb"))), + }, nil), + s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("ccc"), + }).Return(&s3.GetObjectOutput{ + Body: ioutil.NopCloser(bytes.NewReader([]byte("ccccc"))), + }, nil), + s3obj.EXPECT().PutObjectWithContext(context.Background(), NewPutObjectInputMatcher(&s3.PutObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + Body: bytes.NewReader([]byte("aaabbbbccccc")), + })), + s3obj.EXPECT().AbortMultipartUploadWithContext(context.Background(), &s3.AbortMultipartUploadInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + }).Return(nil, nil), + ) + + upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") + assert.Nil(err) + + uploadA, err := store.GetUpload(context.Background(), "aaa+AAA") + assert.Nil(err) + uploadB, err := store.GetUpload(context.Background(), "bbb+BBB") + assert.Nil(err) + uploadC, err := store.GetUpload(context.Background(), "ccc+CCC") + assert.Nil(err) + + // All uploads have a size smaller than the MinPartSize, so the files are downloaded for concatenation. + uploadA.(*s3Upload).info = &handler.FileInfo{Size: 3} + uploadB.(*s3Upload).info = &handler.FileInfo{Size: 4} + uploadC.(*s3Upload).info = &handler.FileInfo{Size: 5} + + err = store.AsConcatableUpload(upload).ConcatUploads(context.Background(), []handler.Upload{ + uploadA, + uploadB, + uploadC, + }) + assert.Nil(err) + + // Wait a short delay until the call to AbortMultipartUploadWithContext also occurs. + <-time.After(10 * time.Millisecond) +}