diff --git a/s3store/calcpartsize_test.go b/s3store/calcpartsize_test.go new file mode 100644 index 0000000..4b0f9d7 --- /dev/null +++ b/s3store/calcpartsize_test.go @@ -0,0 +1,165 @@ +package s3store + +import ( + "fmt" + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" +) + +const enableTestDebugOutput = false + +func assertCalculatedPartSize(store S3Store, assert *assert.Assertions, size int64) { + optimalPartSize, err := store.calcOptimalPartSize(size) + assert.Nil(err, "Size %d, no error should be returned.\n", size) + + // Number of parts with the same size + equalparts := size / optimalPartSize + // Size of the last part (or 0 if no spare part is needed) + lastpartSize := size % optimalPartSize + + prelude := fmt.Sprintf("Size %d, %d parts of size %d, lastpart %d: ", size, equalparts, optimalPartSize, lastpartSize) + + assert.False(optimalPartSize < store.MinPartSize, prelude+"optimalPartSize < MinPartSize %d.\n", store.MinPartSize) + assert.False(optimalPartSize > store.MaxPartSize, prelude+"optimalPartSize > MaxPartSize %d.\n", store.MaxPartSize) + assert.False(lastpartSize == 0 && equalparts > store.MaxMultipartParts, prelude+"more parts than MaxMultipartParts %d.\n", store.MaxMultipartParts) + assert.False(lastpartSize > 0 && equalparts > store.MaxMultipartParts-1, prelude+"more parts than MaxMultipartParts %d.\n", store.MaxMultipartParts) + assert.False(lastpartSize > store.MaxPartSize, prelude+"lastpart > MaxPartSize %d.\n", store.MaxPartSize) + assert.False(lastpartSize > optimalPartSize, prelude+"lastpart > optimalPartSize %d.\n", optimalPartSize) + assert.True(size <= optimalPartSize*store.MaxMultipartParts, prelude+"upload does not fit in %d parts.\n", store.MaxMultipartParts) + + if enableTestDebugOutput { + fmt.Printf(prelude+"does exceed MaxObjectSize: %t.\n", size > store.MaxObjectSize) + } +} + +func TestCalcOptimalPartSize(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + assert := assert.New(t) + + s3obj := NewMockS3API(mockCtrl) + store := New("bucket", s3obj) + + // If you quickly want to override the default values in this test + /* + store.MinPartSize = 2 + store.MaxPartSize = 10 + store.MaxMultipartParts = 20 + store.MaxObjectSize = 200 + */ + + // sanity check + if store.MaxObjectSize > store.MaxPartSize*store.MaxMultipartParts { + t.Errorf("MaxObjectSize %v can never be achieved, as MaxMultipartParts %v and MaxPartSize %v only allow for an upload of %v bytes total.\n", store.MaxObjectSize, store.MaxMultipartParts, store.MaxPartSize, store.MaxMultipartParts*store.MaxPartSize) + } + + HighestApplicablePartSize := store.MaxObjectSize / store.MaxMultipartParts + if store.MaxObjectSize%store.MaxMultipartParts > 0 { + HighestApplicablePartSize++ + } + RemainderWithHighestApplicablePartSize := store.MaxObjectSize % HighestApplicablePartSize + + // some of these tests are actually duplicates, as they specify the same size + // in bytes - two ways to describe the same thing. That is wanted, in order + // to provide a full picture from any angle. + testcases := []int64{ + 0, + 1, + store.MinPartSize - 1, + store.MinPartSize, + store.MinPartSize + 1, + + store.MinPartSize*(store.MaxMultipartParts-1) - 1, + store.MinPartSize * (store.MaxMultipartParts - 1), + store.MinPartSize*(store.MaxMultipartParts-1) + 1, + + store.MinPartSize*store.MaxMultipartParts - 1, + store.MinPartSize * store.MaxMultipartParts, + store.MinPartSize*store.MaxMultipartParts + 1, + + store.MinPartSize*(store.MaxMultipartParts+1) - 1, + store.MinPartSize * (store.MaxMultipartParts + 1), + store.MinPartSize*(store.MaxMultipartParts+1) + 1, + + (HighestApplicablePartSize-1)*store.MaxMultipartParts - 1, + (HighestApplicablePartSize - 1) * store.MaxMultipartParts, + (HighestApplicablePartSize-1)*store.MaxMultipartParts + 1, + + HighestApplicablePartSize*(store.MaxMultipartParts-1) - 1, + HighestApplicablePartSize * (store.MaxMultipartParts - 1), + HighestApplicablePartSize*(store.MaxMultipartParts-1) + 1, + + HighestApplicablePartSize*(store.MaxMultipartParts-1) + RemainderWithHighestApplicablePartSize - 1, + HighestApplicablePartSize*(store.MaxMultipartParts-1) + RemainderWithHighestApplicablePartSize, + HighestApplicablePartSize*(store.MaxMultipartParts-1) + RemainderWithHighestApplicablePartSize + 1, + + store.MaxObjectSize - 1, + store.MaxObjectSize, + store.MaxObjectSize + 1, + + (store.MaxObjectSize/store.MaxMultipartParts)*(store.MaxMultipartParts-1) - 1, + (store.MaxObjectSize / store.MaxMultipartParts) * (store.MaxMultipartParts - 1), + (store.MaxObjectSize/store.MaxMultipartParts)*(store.MaxMultipartParts-1) + 1, + + store.MaxPartSize*(store.MaxMultipartParts-1) - 1, + store.MaxPartSize * (store.MaxMultipartParts - 1), + store.MaxPartSize*(store.MaxMultipartParts-1) + 1, + + store.MaxPartSize*store.MaxMultipartParts - 1, + store.MaxPartSize * store.MaxMultipartParts, + // We cannot calculate a part size for store.MaxPartSize*store.MaxMultipartParts + 1 + // This case is tested in TestCalcOptimalPartSize_ExceedingMaxPartSize + } + + for _, size := range testcases { + assertCalculatedPartSize(store, assert, size) + } + + if enableTestDebugOutput { + fmt.Println("HighestApplicablePartSize", HighestApplicablePartSize) + fmt.Println("RemainderWithHighestApplicablePartSize", RemainderWithHighestApplicablePartSize) + } +} + +func TestCalcOptimalPartSize_AllUploadSizes(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode.") + } + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + assert := assert.New(t) + + s3obj := NewMockS3API(mockCtrl) + store := New("bucket", s3obj) + + store.MinPartSize = 5 + store.MaxPartSize = 5 * 1024 + store.MaxMultipartParts = 1000 + store.MaxObjectSize = store.MaxPartSize * store.MaxMultipartParts + + // sanity check + if store.MaxObjectSize > store.MaxPartSize*store.MaxMultipartParts { + t.Errorf("MaxObjectSize %v can never be achieved, as MaxMultipartParts %v and MaxPartSize %v only allow for an upload of %v bytes total.\n", store.MaxObjectSize, store.MaxMultipartParts, store.MaxPartSize, store.MaxMultipartParts*store.MaxPartSize) + } + + for size := int64(0); size <= store.MaxObjectSize; size++ { + assertCalculatedPartSize(store, assert, size) + } +} + +func TestCalcOptimalPartSize_ExceedingMaxPartSize(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + assert := assert.New(t) + + s3obj := NewMockS3API(mockCtrl) + store := New("bucket", s3obj) + + size := store.MaxPartSize*store.MaxMultipartParts + 1 + + optimalPartSize, err := store.calcOptimalPartSize(size) + assert.NotNil(err) + assert.EqualError(err, fmt.Sprintf("calcOptimalPartSize: to upload %v bytes optimalPartSize %v must exceed MaxPartSize %v", size, optimalPartSize, store.MaxPartSize)) +} diff --git a/s3store/s3store.go b/s3store/s3store.go index 69f5c1a..3b9465e 100644 --- a/s3store/s3store.go +++ b/s3store/s3store.go @@ -128,6 +128,13 @@ type S3Store struct { // in bytes. This number needs to match with the underlying S3 backend or else // uploaded parts will be reject. AWS S3, for example, uses 5MB for this value. MinPartSize int64 + // MaxMultipartParts is the maximum number of parts an S3 multipart upload is + // allowed to have according to AWS S3 API specifications. + // See: http://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html + MaxMultipartParts int64 + // MaxObjectSize is the maximum size an S3 Object can have according to S3 + // API specifications. See link above. + MaxObjectSize int64 } type S3API interface { @@ -146,10 +153,12 @@ type S3API interface { // The MaxPartSize and MinPartSize properties are set to 6 and 5MB. func New(bucket string, service S3API) S3Store { return S3Store{ - Bucket: bucket, - Service: service, - MaxPartSize: 6 * 1024 * 1024, - MinPartSize: 5 * 1024 * 1024, + Bucket: bucket, + Service: service, + MaxPartSize: 5 * 1024 * 1024 * 1024, + MinPartSize: 5 * 1024 * 1024, + MaxMultipartParts: 10000, + MaxObjectSize: 5 * 1024 * 1024 * 1024 * 1024, } } @@ -164,6 +173,11 @@ func (store S3Store) UseIn(composer *tusd.StoreComposer) { } func (store S3Store) NewUpload(info tusd.FileInfo) (id string, err error) { + // an upload larger than MaxObjectSize must throw an error + if info.Size > store.MaxObjectSize { + return "", fmt.Errorf("s3store: upload size of %v bytes exceeds MaxObjectSize of %v bytes", info.Size, store.MaxObjectSize) + } + var uploadId string if info.ID == "" { uploadId = uid.Uid() @@ -224,19 +238,18 @@ func (store S3Store) WriteChunk(id string, offset int64, src io.Reader) (int64, size := info.Size bytesUploaded := int64(0) + optimalPartSize, err := store.calcOptimalPartSize(size) + if err != nil { + return bytesUploaded, err + } // Get number of parts to generate next number - listPtr, err := store.Service.ListParts(&s3.ListPartsInput{ - Bucket: aws.String(store.Bucket), - Key: aws.String(uploadId), - UploadId: aws.String(multipartId), - }) + parts, err := store.listAllParts(id) if err != nil { return 0, err } - list := *listPtr - numParts := len(list.Parts) + numParts := len(parts) nextPartNum := int64(numParts + 1) for { @@ -248,7 +261,7 @@ func (store S3Store) WriteChunk(id string, offset int64, src io.Reader) (int64, defer os.Remove(file.Name()) defer file.Close() - limitedReader := io.LimitReader(src, store.MaxPartSize) + limitedReader := io.LimitReader(src, optimalPartSize) n, err := io.Copy(file, limitedReader) // io.Copy does not return io.EOF, so we not have to handle it differently. if err != nil { @@ -259,11 +272,11 @@ func (store S3Store) WriteChunk(id string, offset int64, src io.Reader) (int64, return bytesUploaded, nil } - if (size - offset) <= store.MinPartSize { + if (size - offset) <= optimalPartSize { if (size - offset) != n { return bytesUploaded, nil } - } else if n < store.MinPartSize { + } else if n < optimalPartSize { return bytesUploaded, nil } @@ -288,7 +301,7 @@ func (store S3Store) WriteChunk(id string, offset int64, src io.Reader) (int64, } func (store S3Store) GetInfo(id string) (info tusd.FileInfo, err error) { - uploadId, multipartId := splitIds(id) + uploadId, _ := splitIds(id) // Get file info stored in separate object res, err := store.Service.GetObject(&s3.GetObjectInput{ @@ -308,11 +321,7 @@ func (store S3Store) GetInfo(id string) (info tusd.FileInfo, err error) { } // Get uploaded parts and their offset - listPtr, err := store.Service.ListParts(&s3.ListPartsInput{ - Bucket: aws.String(store.Bucket), - Key: aws.String(uploadId), - UploadId: aws.String(multipartId), - }) + parts, err := store.listAllParts(id) if err != nil { // Check if the error is caused by the upload not being found. This happens // when the multipart upload has already been completed or aborted. Since @@ -326,11 +335,9 @@ func (store S3Store) GetInfo(id string) (info tusd.FileInfo, err error) { } } - list := *listPtr - offset := int64(0) - for _, part := range list.Parts { + for _, part := range parts { offset += *part.Size } @@ -444,22 +451,17 @@ func (store S3Store) FinishUpload(id string) error { uploadId, multipartId := splitIds(id) // Get uploaded parts - listPtr, err := store.Service.ListParts(&s3.ListPartsInput{ - Bucket: aws.String(store.Bucket), - Key: aws.String(uploadId), - UploadId: aws.String(multipartId), - }) + parts, err := store.listAllParts(id) if err != nil { return err } // Transform the []*s3.Part slice to a []*s3.CompletedPart slice for the next // request. - list := *listPtr - parts := make([]*s3.CompletedPart, len(list.Parts)) + completedParts := make([]*s3.CompletedPart, len(parts)) - for index, part := range list.Parts { - parts[index] = &s3.CompletedPart{ + for index, part := range parts { + completedParts[index] = &s3.CompletedPart{ ETag: part.ETag, PartNumber: part.PartNumber, } @@ -470,7 +472,7 @@ func (store S3Store) FinishUpload(id string) error { Key: aws.String(uploadId), UploadId: aws.String(multipartId), MultipartUpload: &s3.CompletedMultipartUpload{ - Parts: parts, + Parts: completedParts, }, }) @@ -517,6 +519,33 @@ func (store S3Store) ConcatUploads(dest string, partialUploads []string) error { return store.FinishUpload(dest) } +func (store S3Store) listAllParts(id string) (parts []*s3.Part, err error) { + uploadId, multipartId := splitIds(id) + + partMarker := int64(0) + for { + // Get uploaded parts + listPtr, err := store.Service.ListParts(&s3.ListPartsInput{ + Bucket: aws.String(store.Bucket), + Key: aws.String(uploadId), + UploadId: aws.String(multipartId), + PartNumberMarker: aws.Int64(partMarker), + }) + if err != nil { + return nil, err + } + + parts = append(parts, (*listPtr).Parts...) + + if listPtr.IsTruncated != nil && *listPtr.IsTruncated { + partMarker = *listPtr.NextPartNumberMarker + } else { + break + } + } + return parts, nil +} + func splitIds(id string) (uploadId, multipartId string) { index := strings.Index(id, "+") if index == -1 { @@ -536,3 +565,47 @@ func isAwsError(err error, code string) bool { } return false } + +func (store S3Store) calcOptimalPartSize(size int64) (optimalPartSize int64, err error) { + switch { + // When upload is smaller or equal MinPartSize, we upload in just one part. + case size <= store.MinPartSize: + optimalPartSize = store.MinPartSize + // Does the upload fit in MaxMultipartParts parts or less with MinPartSize. + case size <= store.MinPartSize*store.MaxMultipartParts: + optimalPartSize = store.MinPartSize + // Prerequisite: Be aware, that the result of an integer division (x/y) is + // ALWAYS rounded DOWN, as there are no digits behind the comma. + // In order to find out, whether we have an exact result or a rounded down + // one, we can check, whether the remainder of that division is 0 (x%y == 0). + // + // So if the result of (size/MaxMultipartParts) is not a rounded down value, + // then we can use it as our optimalPartSize. But if this division produces a + // remainder, we have to round up the result by adding +1. Otherwise our + // upload would not fit into MaxMultipartParts number of parts with that + // size. We would need an additional part in order to upload everything. + // While in almost all cases, we could skip the check for the remainder and + // just add +1 to every result, but there is one case, where doing that would + // doom our upload. When (MaxObjectSize == MaxPartSize * MaxMultipartParts), + // by adding +1, we would end up with an optimalPartSize > MaxPartSize. + // With the current S3 API specifications, we will not run into this problem, + // but these specs are subject to change, and there are other stores as well, + // which are implementing the S3 API (e.g. RIAK, Ceph RadosGW), but might + // have different settings. + case size%store.MaxMultipartParts == 0: + optimalPartSize = size / store.MaxMultipartParts + // Having a remainder larger than 0 means, the float result would have + // digits after the comma (e.g. be something like 10.9). As a result, we can + // only squeeze our upload into MaxMultipartParts parts, if we rounded UP + // this division's result. That is what is happending here. We round up by + // adding +1, if the prior test for (remainder == 0) did not succeed. + default: + optimalPartSize = size/store.MaxMultipartParts + 1 + } + + // optimalPartSize must never exceed MaxPartSize + if optimalPartSize > store.MaxPartSize { + return optimalPartSize, fmt.Errorf("calcOptimalPartSize: to upload %v bytes optimalPartSize %v must exceed MaxPartSize %v", size, optimalPartSize, store.MaxPartSize) + } + return optimalPartSize, nil +} diff --git a/s3store/s3store_mock_test.go b/s3store/s3store_mock_test.go index 4f75ba7..0fc2125 100644 --- a/s3store/s3store_mock_test.go +++ b/s3store/s3store_mock_test.go @@ -1,7 +1,7 @@ // Automatically generated by MockGen. DO NOT EDIT! // Source: github.com/tus/tusd/s3store (interfaces: S3API) -package s3store_test +package s3store import ( s3 "github.com/aws/aws-sdk-go/service/s3" diff --git a/s3store/s3store_test.go b/s3store/s3store_test.go index 2debfa5..f9729e0 100644 --- a/s3store/s3store_test.go +++ b/s3store/s3store_test.go @@ -1,7 +1,8 @@ -package s3store_test +package s3store import ( "bytes" + "fmt" "io/ioutil" "testing" @@ -12,17 +13,16 @@ import ( "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/s3" "github.com/tus/tusd" - "github.com/tus/tusd/s3store" ) -//go:generate mockgen -destination=./s3store_mock_test.go -package=s3store_test github.com/tus/tusd/s3store S3API +//go:generate mockgen -destination=./s3store_mock_test.go -package=s3store github.com/tus/tusd/s3store S3API // Test interface implementations -var _ tusd.DataStore = s3store.S3Store{} -var _ tusd.GetReaderDataStore = s3store.S3Store{} -var _ tusd.TerminaterDataStore = s3store.S3Store{} -var _ tusd.FinisherDataStore = s3store.S3Store{} -var _ tusd.ConcaterDataStore = s3store.S3Store{} +var _ tusd.DataStore = S3Store{} +var _ tusd.GetReaderDataStore = S3Store{} +var _ tusd.TerminaterDataStore = S3Store{} +var _ tusd.FinisherDataStore = S3Store{} +var _ tusd.ConcaterDataStore = S3Store{} func TestNewUpload(t *testing.T) { mockCtrl := gomock.NewController(t) @@ -30,7 +30,7 @@ func TestNewUpload(t *testing.T) { assert := assert.New(t) s3obj := NewMockS3API(mockCtrl) - store := s3store.New("bucket", s3obj) + store := New("bucket", s3obj) assert.Equal("bucket", store.Bucket) assert.Equal(s3obj, store.Service) @@ -71,13 +71,35 @@ func TestNewUpload(t *testing.T) { assert.Equal("uploadId+multipartId", id) } +func TestNewUploadLargerMaxObjectSize(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + assert := assert.New(t) + + s3obj := NewMockS3API(mockCtrl) + store := New("bucket", s3obj) + + assert.Equal("bucket", store.Bucket) + assert.Equal(s3obj, store.Service) + + info := tusd.FileInfo{ + ID: "uploadId", + Size: store.MaxObjectSize + 1, + } + + id, err := store.NewUpload(info) + assert.NotNil(err) + assert.EqualError(err, fmt.Sprintf("s3store: upload size of %v bytes exceeds MaxObjectSize of %v bytes", info.Size, store.MaxObjectSize)) + assert.Equal("", id) +} + func TestGetInfoNotFound(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() assert := assert.New(t) s3obj := NewMockS3API(mockCtrl) - store := s3store.New("bucket", s3obj) + store := New("bucket", s3obj) s3obj.EXPECT().GetObject(&s3.GetObjectInput{ Bucket: aws.String("bucket"), @@ -94,7 +116,7 @@ func TestGetInfo(t *testing.T) { assert := assert.New(t) s3obj := NewMockS3API(mockCtrl) - store := s3store.New("bucket", s3obj) + store := New("bucket", s3obj) gomock.InOrder( s3obj.EXPECT().GetObject(&s3.GetObjectInput{ @@ -104,9 +126,10 @@ func TestGetInfo(t *testing.T) { Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":500,"Offset":0,"MetaData":{"bar":"menĂ¼","foo":"hello"},"IsPartial":false,"IsFinal":false,"PartialUploads":null}`))), }, nil), s3obj.EXPECT().ListParts(&s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), }).Return(&s3.ListPartsOutput{ Parts: []*s3.Part{ { @@ -116,13 +139,27 @@ func TestGetInfo(t *testing.T) { Size: aws.Int64(200), }, }, + NextPartNumberMarker: aws.Int64(2), + IsTruncated: aws.Bool(true), + }, nil), + s3obj.EXPECT().ListParts(&s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(2), + }).Return(&s3.ListPartsOutput{ + Parts: []*s3.Part{ + { + Size: aws.Int64(100), + }, + }, }, nil), ) info, err := store.GetInfo("uploadId+multipartId") assert.Nil(err) assert.Equal(int64(500), info.Size) - assert.Equal(int64(300), info.Offset) + assert.Equal(int64(400), info.Offset) assert.Equal("uploadId+multipartId", info.ID) assert.Equal("hello", info.MetaData["foo"]) assert.Equal("menĂ¼", info.MetaData["bar"]) @@ -134,7 +171,7 @@ func TestGetInfoFinished(t *testing.T) { assert := assert.New(t) s3obj := NewMockS3API(mockCtrl) - store := s3store.New("bucket", s3obj) + store := New("bucket", s3obj) gomock.InOrder( s3obj.EXPECT().GetObject(&s3.GetObjectInput{ @@ -144,9 +181,10 @@ func TestGetInfoFinished(t *testing.T) { Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null}`))), }, nil), s3obj.EXPECT().ListParts(&s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), }).Return(nil, awserr.New("NoSuchUpload", "The specified upload does not exist.", nil)), ) @@ -162,7 +200,7 @@ func TestGetReader(t *testing.T) { assert := assert.New(t) s3obj := NewMockS3API(mockCtrl) - store := s3store.New("bucket", s3obj) + store := New("bucket", s3obj) s3obj.EXPECT().GetObject(&s3.GetObjectInput{ Bucket: aws.String("bucket"), @@ -182,7 +220,7 @@ func TestGetReaderNotFound(t *testing.T) { assert := assert.New(t) s3obj := NewMockS3API(mockCtrl) - store := s3store.New("bucket", s3obj) + store := New("bucket", s3obj) gomock.InOrder( s3obj.EXPECT().GetObject(&s3.GetObjectInput{ @@ -208,7 +246,7 @@ func TestGetReaderNotFinished(t *testing.T) { assert := assert.New(t) s3obj := NewMockS3API(mockCtrl) - store := s3store.New("bucket", s3obj) + store := New("bucket", s3obj) gomock.InOrder( s3obj.EXPECT().GetObject(&s3.GetObjectInput{ @@ -236,13 +274,14 @@ func TestFinishUpload(t *testing.T) { assert := assert.New(t) s3obj := NewMockS3API(mockCtrl) - store := s3store.New("bucket", s3obj) + store := New("bucket", s3obj) gomock.InOrder( s3obj.EXPECT().ListParts(&s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), }).Return(&s3.ListPartsOutput{ Parts: []*s3.Part{ { @@ -256,6 +295,22 @@ func TestFinishUpload(t *testing.T) { PartNumber: aws.Int64(2), }, }, + NextPartNumberMarker: aws.Int64(2), + IsTruncated: aws.Bool(true), + }, nil), + s3obj.EXPECT().ListParts(&s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(2), + }).Return(&s3.ListPartsOutput{ + Parts: []*s3.Part{ + { + Size: aws.Int64(100), + ETag: aws.String("foobar"), + PartNumber: aws.Int64(3), + }, + }, }, nil), s3obj.EXPECT().CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{ Bucket: aws.String("bucket"), @@ -271,6 +326,10 @@ func TestFinishUpload(t *testing.T) { ETag: aws.String("bar"), PartNumber: aws.Int64(2), }, + { + ETag: aws.String("foobar"), + PartNumber: aws.Int64(3), + }, }, }, }).Return(nil, nil), @@ -286,9 +345,11 @@ func TestWriteChunk(t *testing.T) { assert := assert.New(t) s3obj := NewMockS3API(mockCtrl) - store := s3store.New("bucket", s3obj) - store.MaxPartSize = 4 - store.MinPartSize = 2 + store := New("bucket", s3obj) + store.MaxPartSize = 8 + store.MinPartSize = 4 + store.MaxMultipartParts = 10000 + store.MaxObjectSize = 5 * 1024 * 1024 * 1024 * 1024 gomock.InOrder( s3obj.EXPECT().GetObject(&s3.GetObjectInput{ @@ -298,9 +359,10 @@ func TestWriteChunk(t *testing.T) { Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null}`))), }, nil), s3obj.EXPECT().ListParts(&s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), }).Return(&s3.ListPartsOutput{ Parts: []*s3.Part{ { @@ -312,9 +374,10 @@ func TestWriteChunk(t *testing.T) { }, }, nil), s3obj.EXPECT().ListParts(&s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), }).Return(&s3.ListPartsOutput{ Parts: []*s3.Part{ { @@ -344,13 +407,15 @@ func TestWriteChunk(t *testing.T) { Key: aws.String("uploadId"), UploadId: aws.String("multipartId"), PartNumber: aws.Int64(5), - Body: bytes.NewReader([]byte("90")), + Body: bytes.NewReader([]byte("90AB")), })).Return(nil, nil), ) - bytesRead, err := store.WriteChunk("uploadId+multipartId", 300, bytes.NewReader([]byte("1234567890"))) + // The last bytes "CD" will be ignored, as they are not the last bytes of the + // upload (500 bytes total) and not of full part-size. + bytesRead, err := store.WriteChunk("uploadId+multipartId", 300, bytes.NewReader([]byte("1234567890ABCD"))) assert.Nil(err) - assert.Equal(int64(10), bytesRead) + assert.Equal(int64(12), bytesRead) } func TestWriteChunkDropTooSmall(t *testing.T) { @@ -359,7 +424,7 @@ func TestWriteChunkDropTooSmall(t *testing.T) { assert := assert.New(t) s3obj := NewMockS3API(mockCtrl) - store := s3store.New("bucket", s3obj) + store := New("bucket", s3obj) gomock.InOrder( s3obj.EXPECT().GetObject(&s3.GetObjectInput{ @@ -369,9 +434,10 @@ func TestWriteChunkDropTooSmall(t *testing.T) { Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null}`))), }, nil), s3obj.EXPECT().ListParts(&s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), }).Return(&s3.ListPartsOutput{ Parts: []*s3.Part{ { @@ -383,9 +449,10 @@ func TestWriteChunkDropTooSmall(t *testing.T) { }, }, nil), s3obj.EXPECT().ListParts(&s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), }).Return(&s3.ListPartsOutput{ Parts: []*s3.Part{ { @@ -409,7 +476,7 @@ func TestWriteChunkAllowTooSmallLast(t *testing.T) { assert := assert.New(t) s3obj := NewMockS3API(mockCtrl) - store := s3store.New("bucket", s3obj) + store := New("bucket", s3obj) store.MinPartSize = 20 gomock.InOrder( @@ -420,9 +487,10 @@ func TestWriteChunkAllowTooSmallLast(t *testing.T) { Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null}`))), }, nil), s3obj.EXPECT().ListParts(&s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), }).Return(&s3.ListPartsOutput{ Parts: []*s3.Part{ { @@ -434,9 +502,10 @@ func TestWriteChunkAllowTooSmallLast(t *testing.T) { }, }, nil), s3obj.EXPECT().ListParts(&s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), }).Return(&s3.ListPartsOutput{ Parts: []*s3.Part{ { @@ -470,7 +539,7 @@ func TestTerminate(t *testing.T) { assert := assert.New(t) s3obj := NewMockS3API(mockCtrl) - store := s3store.New("bucket", s3obj) + store := New("bucket", s3obj) // Order is not important in this situation. s3obj.EXPECT().AbortMultipartUpload(&s3.AbortMultipartUploadInput{ @@ -504,7 +573,7 @@ func TestTerminateWithErrors(t *testing.T) { assert := assert.New(t) s3obj := NewMockS3API(mockCtrl) - store := s3store.New("bucket", s3obj) + store := New("bucket", s3obj) // Order is not important in this situation. // NoSuchUpload errors should be ignored @@ -547,7 +616,7 @@ func TestConcatUploads(t *testing.T) { assert := assert.New(t) s3obj := NewMockS3API(mockCtrl) - store := s3store.New("bucket", s3obj) + store := New("bucket", s3obj) s3obj.EXPECT().UploadPartCopy(&s3.UploadPartCopyInput{ Bucket: aws.String("bucket"), @@ -576,9 +645,10 @@ func TestConcatUploads(t *testing.T) { // Output from s3Store.FinishUpload gomock.InOrder( s3obj.EXPECT().ListParts(&s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), }).Return(&s3.ListPartsOutput{ Parts: []*s3.Part{ { diff --git a/s3store/s3store_util_test.go b/s3store/s3store_util_test.go index dde80d5..10553d6 100644 --- a/s3store/s3store_util_test.go +++ b/s3store/s3store_util_test.go @@ -1,4 +1,4 @@ -package s3store_test +package s3store import ( "fmt"