s3store: Allow concatenation of uploads with less than 5MB
This commit is contained in:
parent
cfebf1778e
commit
9210fbe0fc
|
@ -614,6 +614,91 @@ func (upload s3Upload) FinishUpload(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (upload *s3Upload) ConcatUploads(ctx context.Context, partialUploads []handler.Upload) error {
|
func (upload *s3Upload) ConcatUploads(ctx context.Context, partialUploads []handler.Upload) error {
|
||||||
|
hasSmallPart := false
|
||||||
|
for _, partialUpload := range partialUploads {
|
||||||
|
info, err := partialUpload.GetInfo(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if info.Size < upload.store.MinPartSize {
|
||||||
|
hasSmallPart = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If one partial upload is smaller than the the minimum part size for an S3
|
||||||
|
// Multipart Upload, we cannot use S3 Multipart Uploads for concatenating all
|
||||||
|
// the files.
|
||||||
|
// So instead we have to download them and concat them on disk.
|
||||||
|
if hasSmallPart {
|
||||||
|
return upload.concatUsingDownload(ctx, partialUploads)
|
||||||
|
} else {
|
||||||
|
return upload.concatUsingMultipart(ctx, partialUploads)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (upload *s3Upload) concatUsingDownload(ctx context.Context, partialUploads []handler.Upload) error {
|
||||||
|
id := upload.id
|
||||||
|
store := upload.store
|
||||||
|
uploadId, multipartId := splitIds(id)
|
||||||
|
|
||||||
|
// Create a temporary file for holding the concatenated data
|
||||||
|
file, err := ioutil.TempFile("", "tusd-s3-concat-tmp-")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
fmt.Println(file.Name())
|
||||||
|
defer os.Remove(file.Name())
|
||||||
|
defer file.Close()
|
||||||
|
|
||||||
|
// Download each part and append it to the temporary file
|
||||||
|
for _, partialUpload := range partialUploads {
|
||||||
|
partialS3Upload := partialUpload.(*s3Upload)
|
||||||
|
partialId, _ := splitIds(partialS3Upload.id)
|
||||||
|
|
||||||
|
res, err := store.Service.GetObjectWithContext(ctx, &s3.GetObjectInput{
|
||||||
|
Bucket: aws.String(store.Bucket),
|
||||||
|
Key: store.keyWithPrefix(partialId),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer res.Body.Close()
|
||||||
|
|
||||||
|
if _, err := io.Copy(file, res.Body); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Seek to the beginning of the file, so the entire file is being uploaded
|
||||||
|
file.Seek(0, 0)
|
||||||
|
|
||||||
|
// Upload the entire file to S3
|
||||||
|
_, err = store.Service.PutObjectWithContext(ctx, &s3.PutObjectInput{
|
||||||
|
Bucket: aws.String(store.Bucket),
|
||||||
|
Key: store.keyWithPrefix(uploadId),
|
||||||
|
Body: file,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finally, abort the multipart upload since it will no longer be used.
|
||||||
|
// This happens asynchronously since we do not need to wait for the result.
|
||||||
|
// Also, the error is ignored on purpose as it does not change the outcome of
|
||||||
|
// the request.
|
||||||
|
go func() {
|
||||||
|
store.Service.AbortMultipartUploadWithContext(ctx, &s3.AbortMultipartUploadInput{
|
||||||
|
Bucket: aws.String(store.Bucket),
|
||||||
|
Key: store.keyWithPrefix(uploadId),
|
||||||
|
UploadId: aws.String(multipartId),
|
||||||
|
})
|
||||||
|
}()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (upload *s3Upload) concatUsingMultipart(ctx context.Context, partialUploads []handler.Upload) error {
|
||||||
id := upload.id
|
id := upload.id
|
||||||
store := upload.store
|
store := upload.store
|
||||||
uploadId, multipartId := splitIds(id)
|
uploadId, multipartId := splitIds(id)
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/golang/mock/gomock"
|
"github.com/golang/mock/gomock"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
@ -1228,13 +1229,14 @@ func TestTerminateWithErrors(t *testing.T) {
|
||||||
assert.Equal("Multiple errors occurred:\n\tAWS S3 Error (hello) for object uploadId: it's me.\n", err.Error())
|
assert.Equal("Multiple errors occurred:\n\tAWS S3 Error (hello) for object uploadId: it's me.\n", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestConcatUploads(t *testing.T) {
|
func TestConcatUploadsUsingMultipart(t *testing.T) {
|
||||||
mockCtrl := gomock.NewController(t)
|
mockCtrl := gomock.NewController(t)
|
||||||
defer mockCtrl.Finish()
|
defer mockCtrl.Finish()
|
||||||
assert := assert.New(t)
|
assert := assert.New(t)
|
||||||
|
|
||||||
s3obj := NewMockS3API(mockCtrl)
|
s3obj := NewMockS3API(mockCtrl)
|
||||||
store := New("bucket", s3obj)
|
store := New("bucket", s3obj)
|
||||||
|
store.MinPartSize = 100
|
||||||
|
|
||||||
s3obj.EXPECT().UploadPartCopyWithContext(context.Background(), &s3.UploadPartCopyInput{
|
s3obj.EXPECT().UploadPartCopyWithContext(context.Background(), &s3.UploadPartCopyInput{
|
||||||
Bucket: aws.String("bucket"),
|
Bucket: aws.String("bucket"),
|
||||||
|
@ -1316,6 +1318,11 @@ func TestConcatUploads(t *testing.T) {
|
||||||
uploadC, err := store.GetUpload(context.Background(), "ccc+CCC")
|
uploadC, err := store.GetUpload(context.Background(), "ccc+CCC")
|
||||||
assert.Nil(err)
|
assert.Nil(err)
|
||||||
|
|
||||||
|
// All uploads have a size larger than the MinPartSize, so a S3 Multipart Upload is used for concatenation.
|
||||||
|
uploadA.(*s3Upload).info = &handler.FileInfo{Size: 500}
|
||||||
|
uploadB.(*s3Upload).info = &handler.FileInfo{Size: 500}
|
||||||
|
uploadC.(*s3Upload).info = &handler.FileInfo{Size: 500}
|
||||||
|
|
||||||
err = store.AsConcatableUpload(upload).ConcatUploads(context.Background(), []handler.Upload{
|
err = store.AsConcatableUpload(upload).ConcatUploads(context.Background(), []handler.Upload{
|
||||||
uploadA,
|
uploadA,
|
||||||
uploadB,
|
uploadB,
|
||||||
|
@ -1323,3 +1330,69 @@ func TestConcatUploads(t *testing.T) {
|
||||||
})
|
})
|
||||||
assert.Nil(err)
|
assert.Nil(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestConcatUploadsUsingDownload(t *testing.T) {
|
||||||
|
mockCtrl := gomock.NewController(t)
|
||||||
|
defer mockCtrl.Finish()
|
||||||
|
assert := assert.New(t)
|
||||||
|
|
||||||
|
s3obj := NewMockS3API(mockCtrl)
|
||||||
|
store := New("bucket", s3obj)
|
||||||
|
store.MinPartSize = 100
|
||||||
|
|
||||||
|
gomock.InOrder(
|
||||||
|
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
|
||||||
|
Bucket: aws.String("bucket"),
|
||||||
|
Key: aws.String("aaa"),
|
||||||
|
}).Return(&s3.GetObjectOutput{
|
||||||
|
Body: ioutil.NopCloser(bytes.NewReader([]byte("aaa"))),
|
||||||
|
}, nil),
|
||||||
|
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
|
||||||
|
Bucket: aws.String("bucket"),
|
||||||
|
Key: aws.String("bbb"),
|
||||||
|
}).Return(&s3.GetObjectOutput{
|
||||||
|
Body: ioutil.NopCloser(bytes.NewReader([]byte("bbbb"))),
|
||||||
|
}, nil),
|
||||||
|
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
|
||||||
|
Bucket: aws.String("bucket"),
|
||||||
|
Key: aws.String("ccc"),
|
||||||
|
}).Return(&s3.GetObjectOutput{
|
||||||
|
Body: ioutil.NopCloser(bytes.NewReader([]byte("ccccc"))),
|
||||||
|
}, nil),
|
||||||
|
s3obj.EXPECT().PutObjectWithContext(context.Background(), NewPutObjectInputMatcher(&s3.PutObjectInput{
|
||||||
|
Bucket: aws.String("bucket"),
|
||||||
|
Key: aws.String("uploadId"),
|
||||||
|
Body: bytes.NewReader([]byte("aaabbbbccccc")),
|
||||||
|
})),
|
||||||
|
s3obj.EXPECT().AbortMultipartUploadWithContext(context.Background(), &s3.AbortMultipartUploadInput{
|
||||||
|
Bucket: aws.String("bucket"),
|
||||||
|
Key: aws.String("uploadId"),
|
||||||
|
UploadId: aws.String("multipartId"),
|
||||||
|
}).Return(nil, nil),
|
||||||
|
)
|
||||||
|
|
||||||
|
upload, err := store.GetUpload(context.Background(), "uploadId+multipartId")
|
||||||
|
assert.Nil(err)
|
||||||
|
|
||||||
|
uploadA, err := store.GetUpload(context.Background(), "aaa+AAA")
|
||||||
|
assert.Nil(err)
|
||||||
|
uploadB, err := store.GetUpload(context.Background(), "bbb+BBB")
|
||||||
|
assert.Nil(err)
|
||||||
|
uploadC, err := store.GetUpload(context.Background(), "ccc+CCC")
|
||||||
|
assert.Nil(err)
|
||||||
|
|
||||||
|
// All uploads have a size smaller than the MinPartSize, so the files are downloaded for concatenation.
|
||||||
|
uploadA.(*s3Upload).info = &handler.FileInfo{Size: 3}
|
||||||
|
uploadB.(*s3Upload).info = &handler.FileInfo{Size: 4}
|
||||||
|
uploadC.(*s3Upload).info = &handler.FileInfo{Size: 5}
|
||||||
|
|
||||||
|
err = store.AsConcatableUpload(upload).ConcatUploads(context.Background(), []handler.Upload{
|
||||||
|
uploadA,
|
||||||
|
uploadB,
|
||||||
|
uploadC,
|
||||||
|
})
|
||||||
|
assert.Nil(err)
|
||||||
|
|
||||||
|
// Wait a short delay until the call to AbortMultipartUploadWithContext also occurs.
|
||||||
|
<-time.After(10 * time.Millisecond)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue