Implement Concatentation extension for S3Store

This commit is contained in:
Marius 2016-02-03 21:18:21 +01:00
parent 9cd15b2a0f
commit bbf706622c
3 changed files with 141 additions and 6 deletions

13
s3store/multi_error.go Normal file
View File

@ -0,0 +1,13 @@
package s3store
import (
"errors"
)
func newMultiError(errs []error) error {
message := "Multiple errors occured:\n"
for _, err := range errs {
message += "\t" + err.Error() + "\n"
}
return errors.New(message)
}

View File

@ -66,7 +66,7 @@
// ensure that the server running this storage backend has enough disk space // ensure that the server running this storage backend has enough disk space
// available to hold these caches. // available to hold these caches.
// //
// In addition, it must be mentioned that AWS S3 only offeres eventual // In addition, it must be mentioned that AWS S3 only offers eventual
// consistency (https://aws.amazon.com/s3/faqs/#What_data_consistency_model_does_Amazon_S3_employ). // consistency (https://aws.amazon.com/s3/faqs/#What_data_consistency_model_does_Amazon_S3_employ).
// Therefore, it is required to build additional measurements in order to // Therefore, it is required to build additional measurements in order to
// prevent concurrent access to the same upload resources which may result in // prevent concurrent access to the same upload resources which may result in
@ -395,11 +395,7 @@ func (store S3Store) Terminate(id string) error {
wg.Wait() wg.Wait()
if len(errs) > 0 { if len(errs) > 0 {
message := "Multiple errors occured:\n" return newMultiError(errs)
for _, err := range errs {
message += "\t" + err.Error() + "\n"
}
return errors.New(message)
} }
return nil return nil
@ -442,6 +438,46 @@ func (store S3Store) FinishUpload(id string) error {
return err return err
} }
func (store S3Store) ConcatUploads(dest string, partialUploads []string) error {
uploadId, multipartId := splitIds(dest)
numPartialUploads := len(partialUploads)
errs := make([]error, 0, numPartialUploads)
// Copy partial uploads concurrently
var wg sync.WaitGroup
wg.Add(numPartialUploads)
for i, partialId := range partialUploads {
go func(i int, partialId string) {
defer wg.Done()
partialUploadId, _ := splitIds(partialId)
_, err := store.Service.UploadPartCopy(&s3.UploadPartCopyInput{
Bucket: aws.String(store.Bucket),
Key: aws.String(uploadId),
UploadId: aws.String(multipartId),
// Part numbers must be in the range of 1 to 10000, inclusive. Since
// slice indexes start at 0, we add 1 to ensure that i >= 1.
PartNumber: aws.Int64(int64(i + 1)),
CopySource: aws.String(store.Bucket + "/" + partialUploadId),
})
if err != nil {
errs = append(errs, err)
return
}
}(i, partialId)
}
wg.Wait()
if len(errs) > 0 {
return newMultiError(errs)
}
return store.FinishUpload(dest)
}
func splitIds(id string) (uploadId, multipartId string) { func splitIds(id string) (uploadId, multipartId string) {
index := strings.Index(id, "+") index := strings.Index(id, "+")
if index == -1 { if index == -1 {

View File

@ -22,6 +22,7 @@ var _ tusd.DataStore = s3store.S3Store{}
var _ tusd.GetReaderDataStore = s3store.S3Store{} var _ tusd.GetReaderDataStore = s3store.S3Store{}
var _ tusd.TerminaterDataStore = s3store.S3Store{} var _ tusd.TerminaterDataStore = s3store.S3Store{}
var _ tusd.FinisherDataStore = s3store.S3Store{} var _ tusd.FinisherDataStore = s3store.S3Store{}
var _ tusd.ConcaterDataStore = s3store.S3Store{}
func TestNewUpload(t *testing.T) { func TestNewUpload(t *testing.T) {
mockCtrl := gomock.NewController(t) mockCtrl := gomock.NewController(t)
@ -536,3 +537,88 @@ func TestTerminateWithErrors(t *testing.T) {
err := store.Terminate("uploadId+multipartId") err := store.Terminate("uploadId+multipartId")
assert.Equal("Multiple errors occured:\n\tAWS S3 Error (hello) for object uploadId: it's me.\n", err.Error()) assert.Equal("Multiple errors occured:\n\tAWS S3 Error (hello) for object uploadId: it's me.\n", err.Error())
} }
func TestConcatUploads(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
assert := assert.New(t)
s3obj := NewMockS3API(mockCtrl)
store := s3store.New("bucket", s3obj)
s3obj.EXPECT().UploadPartCopy(&s3.UploadPartCopyInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
CopySource: aws.String("bucket/aaa"),
PartNumber: aws.Int64(1),
}).Return(nil, nil)
s3obj.EXPECT().UploadPartCopy(&s3.UploadPartCopyInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
CopySource: aws.String("bucket/bbb"),
PartNumber: aws.Int64(2),
}).Return(nil, nil)
s3obj.EXPECT().UploadPartCopy(&s3.UploadPartCopyInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
CopySource: aws.String("bucket/ccc"),
PartNumber: aws.Int64(3),
}).Return(nil, nil)
// Output from s3Store.FinishUpload
gomock.InOrder(
s3obj.EXPECT().ListParts(&s3.ListPartsInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
}).Return(&s3.ListPartsOutput{
Parts: []*s3.Part{
{
ETag: aws.String("foo"),
PartNumber: aws.Int64(1),
},
{
ETag: aws.String("bar"),
PartNumber: aws.Int64(2),
},
{
ETag: aws.String("baz"),
PartNumber: aws.Int64(3),
},
},
}, nil),
s3obj.EXPECT().CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
MultipartUpload: &s3.CompletedMultipartUpload{
Parts: []*s3.CompletedPart{
{
ETag: aws.String("foo"),
PartNumber: aws.Int64(1),
},
{
ETag: aws.String("bar"),
PartNumber: aws.Int64(2),
},
{
ETag: aws.String("baz"),
PartNumber: aws.Int64(3),
},
},
},
}).Return(nil, nil),
)
err := store.ConcatUploads("uploadId+multipartId", []string{
"aaa+AAA",
"bbb+BBB",
"ccc+CCC",
})
assert.Nil(err)
}