s3store: calculate part size based on upload size
See https://github.com/tus/tusd/issues/149 and https://github.com/tus/tusd/pull/150 for more details. Squashed commit of the following: commit 78312ab26ea7ee664038e5b5d362bd534bfe0e37 Author: Marius <maerious@gmail.com> Date: Fri Sep 1 19:49:48 2017 +0200 Correct error assertions for exceeding max part size commit 9350712c0a46651e6a7a91d8819307ba4b08ec7e Author: Marius <maerious@gmail.com> Date: Fri Sep 1 19:44:28 2017 +0200 Make CalcOptimalPartSize unexported commit 593f3b2d37d16c51f229572c1d6b39fc2a234079 Author: Marius <maerious@gmail.com> Date: Fri Sep 1 19:38:46 2017 +0200 Add more output for debugging tests commit b7193bfe67b535c9b9dd441610b41af11fe4538f Author: Marius <maerious@gmail.com> Date: Fri Sep 1 19:35:48 2017 +0200 Extract size assertions into own function commit 7521de23194652519fbbf3d61a41ef0b44b005fa Author: Marius <maerious@gmail.com> Date: Fri Sep 1 19:26:48 2017 +0200 Move tests for CalcPartSize into own file commit 6c483de7710cc119c870271ccad629c98c15c9a3 Author: Marius <maerious@gmail.com> Date: Fri Sep 1 19:13:02 2017 +0200 Use same assertions in AllUploadSizes test commit 7b0290a07e7def09ea8ed982e7817a2ea7cd468a Author: Marius <maerious@gmail.com> Date: Fri Sep 1 18:30:02 2017 +0200 Split negative test case from TestCalcOptimalPartSize into own test commit 79c0a20d7bc71b494bc0824ad2aa8879b0c2900b Merge: 5240f9b997961f
Author: Marius <maerious@gmail.com> Date: Fri Sep 1 17:32:31 2017 +0200 Merge branch 'f-s3-part-size' of https://github.com/flaneurtv/tusd into flaneurtv-f-s3-part-size commit997961ff5c
Author: Markus Kienast <mark@rickkiste.at> Date: Fri Sep 1 00:59:38 2017 +0200 TestNewUploadLargerMaxObjectSize commit0831bd79f8
Author: Markus Kienast <mark@rickkiste.at> Date: Thu Aug 31 23:08:03 2017 +0200 fmt.Sprintf removed, range from 0 - MaxObjectSize+1 commit1be7081524
Author: Markus Kienast <mark@rickkiste.at> Date: Tue Aug 29 10:23:50 2017 +0200 turn off debug mode commitbe9a9bec10
Author: Markus Kienast <mark@rickkiste.at> Date: Tue Aug 29 10:12:20 2017 +0200 moved MaxObjectSize check to NewUpload, refined tests * moved MaxObjectSize check to NewUpload * removed MaxObjectSize check from CalcOptimalPartSize * switched to assert in tests * added TestAllPartSizes, excluded in short mode TODO: TestNewUploadLargerMaxObjectSize needs to fail if MaxObjectSize > size commit7c22847a45
Author: Markus Kienast <mark@rickkiste.at> Date: Sat Aug 26 12:55:07 2017 +0200 adding debug code to TestCalcOptimalPartSize commit 5240f9b549000fac34be79ddfbe6e82404387f6b Merge: 63c011e5b116e7
Author: Marius <maerious@gmail.com> Date: Sat Aug 26 12:50:51 2017 +0200 Merge branch 'f-s3-part-size' of https://github.com/flaneurtv/tusd into flaneurtv-f-s3-part-size commit 63c011ef768db42e99004df921c2b9e5c4776fd2 Author: Marius <maerious@gmail.com> Date: Sat Aug 26 12:50:45 2017 +0200 Format s3store_test commit5b116e7087
Author: Markus Kienast <mark@rickkiste.at> Date: Sat Aug 26 12:24:22 2017 +0200 restructuring tests to accommodate optimalPartSize of 0 commit93134a5696
Author: Markus Kienast <mark@rickkiste.at> Date: Sat Aug 26 12:03:18 2017 +0200 moving MaxObjectSize check to top commit68e6bb8c41
Author: Markus Kienast <mark@rickkiste.at> Date: Sat Aug 26 02:31:27 2017 +0200 enhance readability, comments and errors commit8831a98c34
Author: Markus Kienast <mark@rickkiste.at> Date: Thu Aug 24 02:27:57 2017 +0200 separated partsize calc and error handling commitf059acc7cc
Author: Markus Kienast <mark@rickkiste.at> Date: Thu Aug 24 01:29:26 2017 +0200 fixed edge cases; pre-cleanup commite2e3b9ffe4
Author: Markus Kienast <mark@rickkiste.at> Date: Wed Aug 23 13:28:59 2017 +0200 added error, when size > MaxObjectSize; additional case in algorithm + tests; go fmt commit381d3326cb
Author: Markus Kienast <mark@rickkiste.at> Date: Thu Aug 17 16:32:25 2017 +0200 calculating PartSize based on size of upload simplified algorithm, respect MaxObjectSize, updated tests, go fmt commit1ad6187d6d
Author: koenvo <info@koenvossen.nl> Date: Thu Aug 17 21:31:37 2017 +0200 Take IsTruncated field of S3 ListParts API response into account (#148) * Take IsTruncated field of S3 ListParts API response into account * Rename s3store.ListParts to ListAllParts * Use proper formatting + make listAllParts private + test listAllParts through TestGetInfo * Update TestFinishUpload to also test paged ListParts response
This commit is contained in:
parent
a51f5994bb
commit
2df5d11672
|
@ -0,0 +1,165 @@
|
|||
package s3store
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/golang/mock/gomock"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
const enableTestDebugOutput = false
|
||||
|
||||
func assertCalculatedPartSize(store S3Store, assert *assert.Assertions, size int64) {
|
||||
optimalPartSize, err := store.calcOptimalPartSize(size)
|
||||
assert.Nil(err, "Size %d, no error should be returned.\n", size)
|
||||
|
||||
// Number of parts with the same size
|
||||
equalparts := size / optimalPartSize
|
||||
// Size of the last part (or 0 if no spare part is needed)
|
||||
lastpartSize := size % optimalPartSize
|
||||
|
||||
prelude := fmt.Sprintf("Size %d, %d parts of size %d, lastpart %d: ", size, equalparts, optimalPartSize, lastpartSize)
|
||||
|
||||
assert.False(optimalPartSize < store.MinPartSize, prelude+"optimalPartSize < MinPartSize %d.\n", store.MinPartSize)
|
||||
assert.False(optimalPartSize > store.MaxPartSize, prelude+"optimalPartSize > MaxPartSize %d.\n", store.MaxPartSize)
|
||||
assert.False(lastpartSize == 0 && equalparts > store.MaxMultipartParts, prelude+"more parts than MaxMultipartParts %d.\n", store.MaxMultipartParts)
|
||||
assert.False(lastpartSize > 0 && equalparts > store.MaxMultipartParts-1, prelude+"more parts than MaxMultipartParts %d.\n", store.MaxMultipartParts)
|
||||
assert.False(lastpartSize > store.MaxPartSize, prelude+"lastpart > MaxPartSize %d.\n", store.MaxPartSize)
|
||||
assert.False(lastpartSize > optimalPartSize, prelude+"lastpart > optimalPartSize %d.\n", optimalPartSize)
|
||||
assert.True(size <= optimalPartSize*store.MaxMultipartParts, prelude+"upload does not fit in %d parts.\n", store.MaxMultipartParts)
|
||||
|
||||
if enableTestDebugOutput {
|
||||
fmt.Printf(prelude+"does exceed MaxObjectSize: %t.\n", size > store.MaxObjectSize)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCalcOptimalPartSize(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
assert := assert.New(t)
|
||||
|
||||
s3obj := NewMockS3API(mockCtrl)
|
||||
store := New("bucket", s3obj)
|
||||
|
||||
// If you quickly want to override the default values in this test
|
||||
/*
|
||||
store.MinPartSize = 2
|
||||
store.MaxPartSize = 10
|
||||
store.MaxMultipartParts = 20
|
||||
store.MaxObjectSize = 200
|
||||
*/
|
||||
|
||||
// sanity check
|
||||
if store.MaxObjectSize > store.MaxPartSize*store.MaxMultipartParts {
|
||||
t.Errorf("MaxObjectSize %v can never be achieved, as MaxMultipartParts %v and MaxPartSize %v only allow for an upload of %v bytes total.\n", store.MaxObjectSize, store.MaxMultipartParts, store.MaxPartSize, store.MaxMultipartParts*store.MaxPartSize)
|
||||
}
|
||||
|
||||
HighestApplicablePartSize := store.MaxObjectSize / store.MaxMultipartParts
|
||||
if store.MaxObjectSize%store.MaxMultipartParts > 0 {
|
||||
HighestApplicablePartSize++
|
||||
}
|
||||
RemainderWithHighestApplicablePartSize := store.MaxObjectSize % HighestApplicablePartSize
|
||||
|
||||
// some of these tests are actually duplicates, as they specify the same size
|
||||
// in bytes - two ways to describe the same thing. That is wanted, in order
|
||||
// to provide a full picture from any angle.
|
||||
testcases := []int64{
|
||||
0,
|
||||
1,
|
||||
store.MinPartSize - 1,
|
||||
store.MinPartSize,
|
||||
store.MinPartSize + 1,
|
||||
|
||||
store.MinPartSize*(store.MaxMultipartParts-1) - 1,
|
||||
store.MinPartSize * (store.MaxMultipartParts - 1),
|
||||
store.MinPartSize*(store.MaxMultipartParts-1) + 1,
|
||||
|
||||
store.MinPartSize*store.MaxMultipartParts - 1,
|
||||
store.MinPartSize * store.MaxMultipartParts,
|
||||
store.MinPartSize*store.MaxMultipartParts + 1,
|
||||
|
||||
store.MinPartSize*(store.MaxMultipartParts+1) - 1,
|
||||
store.MinPartSize * (store.MaxMultipartParts + 1),
|
||||
store.MinPartSize*(store.MaxMultipartParts+1) + 1,
|
||||
|
||||
(HighestApplicablePartSize-1)*store.MaxMultipartParts - 1,
|
||||
(HighestApplicablePartSize - 1) * store.MaxMultipartParts,
|
||||
(HighestApplicablePartSize-1)*store.MaxMultipartParts + 1,
|
||||
|
||||
HighestApplicablePartSize*(store.MaxMultipartParts-1) - 1,
|
||||
HighestApplicablePartSize * (store.MaxMultipartParts - 1),
|
||||
HighestApplicablePartSize*(store.MaxMultipartParts-1) + 1,
|
||||
|
||||
HighestApplicablePartSize*(store.MaxMultipartParts-1) + RemainderWithHighestApplicablePartSize - 1,
|
||||
HighestApplicablePartSize*(store.MaxMultipartParts-1) + RemainderWithHighestApplicablePartSize,
|
||||
HighestApplicablePartSize*(store.MaxMultipartParts-1) + RemainderWithHighestApplicablePartSize + 1,
|
||||
|
||||
store.MaxObjectSize - 1,
|
||||
store.MaxObjectSize,
|
||||
store.MaxObjectSize + 1,
|
||||
|
||||
(store.MaxObjectSize/store.MaxMultipartParts)*(store.MaxMultipartParts-1) - 1,
|
||||
(store.MaxObjectSize / store.MaxMultipartParts) * (store.MaxMultipartParts - 1),
|
||||
(store.MaxObjectSize/store.MaxMultipartParts)*(store.MaxMultipartParts-1) + 1,
|
||||
|
||||
store.MaxPartSize*(store.MaxMultipartParts-1) - 1,
|
||||
store.MaxPartSize * (store.MaxMultipartParts - 1),
|
||||
store.MaxPartSize*(store.MaxMultipartParts-1) + 1,
|
||||
|
||||
store.MaxPartSize*store.MaxMultipartParts - 1,
|
||||
store.MaxPartSize * store.MaxMultipartParts,
|
||||
// We cannot calculate a part size for store.MaxPartSize*store.MaxMultipartParts + 1
|
||||
// This case is tested in TestCalcOptimalPartSize_ExceedingMaxPartSize
|
||||
}
|
||||
|
||||
for _, size := range testcases {
|
||||
assertCalculatedPartSize(store, assert, size)
|
||||
}
|
||||
|
||||
if enableTestDebugOutput {
|
||||
fmt.Println("HighestApplicablePartSize", HighestApplicablePartSize)
|
||||
fmt.Println("RemainderWithHighestApplicablePartSize", RemainderWithHighestApplicablePartSize)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCalcOptimalPartSize_AllUploadSizes(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping test in short mode.")
|
||||
}
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
assert := assert.New(t)
|
||||
|
||||
s3obj := NewMockS3API(mockCtrl)
|
||||
store := New("bucket", s3obj)
|
||||
|
||||
store.MinPartSize = 5
|
||||
store.MaxPartSize = 5 * 1024
|
||||
store.MaxMultipartParts = 1000
|
||||
store.MaxObjectSize = store.MaxPartSize * store.MaxMultipartParts
|
||||
|
||||
// sanity check
|
||||
if store.MaxObjectSize > store.MaxPartSize*store.MaxMultipartParts {
|
||||
t.Errorf("MaxObjectSize %v can never be achieved, as MaxMultipartParts %v and MaxPartSize %v only allow for an upload of %v bytes total.\n", store.MaxObjectSize, store.MaxMultipartParts, store.MaxPartSize, store.MaxMultipartParts*store.MaxPartSize)
|
||||
}
|
||||
|
||||
for size := int64(0); size <= store.MaxObjectSize; size++ {
|
||||
assertCalculatedPartSize(store, assert, size)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCalcOptimalPartSize_ExceedingMaxPartSize(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
assert := assert.New(t)
|
||||
|
||||
s3obj := NewMockS3API(mockCtrl)
|
||||
store := New("bucket", s3obj)
|
||||
|
||||
size := store.MaxPartSize*store.MaxMultipartParts + 1
|
||||
|
||||
optimalPartSize, err := store.calcOptimalPartSize(size)
|
||||
assert.NotNil(err)
|
||||
assert.EqualError(err, fmt.Sprintf("calcOptimalPartSize: to upload %v bytes optimalPartSize %v must exceed MaxPartSize %v", size, optimalPartSize, store.MaxPartSize))
|
||||
}
|
|
@ -128,6 +128,13 @@ type S3Store struct {
|
|||
// in bytes. This number needs to match with the underlying S3 backend or else
|
||||
// uploaded parts will be reject. AWS S3, for example, uses 5MB for this value.
|
||||
MinPartSize int64
|
||||
// MaxMultipartParts is the maximum number of parts an S3 multipart upload is
|
||||
// allowed to have according to AWS S3 API specifications.
|
||||
// See: http://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html
|
||||
MaxMultipartParts int64
|
||||
// MaxObjectSize is the maximum size an S3 Object can have according to S3
|
||||
// API specifications. See link above.
|
||||
MaxObjectSize int64
|
||||
}
|
||||
|
||||
type S3API interface {
|
||||
|
@ -148,8 +155,10 @@ func New(bucket string, service S3API) S3Store {
|
|||
return S3Store{
|
||||
Bucket: bucket,
|
||||
Service: service,
|
||||
MaxPartSize: 6 * 1024 * 1024,
|
||||
MaxPartSize: 5 * 1024 * 1024 * 1024,
|
||||
MinPartSize: 5 * 1024 * 1024,
|
||||
MaxMultipartParts: 10000,
|
||||
MaxObjectSize: 5 * 1024 * 1024 * 1024 * 1024,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -164,6 +173,11 @@ func (store S3Store) UseIn(composer *tusd.StoreComposer) {
|
|||
}
|
||||
|
||||
func (store S3Store) NewUpload(info tusd.FileInfo) (id string, err error) {
|
||||
// an upload larger than MaxObjectSize must throw an error
|
||||
if info.Size > store.MaxObjectSize {
|
||||
return "", fmt.Errorf("s3store: upload size of %v bytes exceeds MaxObjectSize of %v bytes", info.Size, store.MaxObjectSize)
|
||||
}
|
||||
|
||||
var uploadId string
|
||||
if info.ID == "" {
|
||||
uploadId = uid.Uid()
|
||||
|
@ -224,19 +238,18 @@ func (store S3Store) WriteChunk(id string, offset int64, src io.Reader) (int64,
|
|||
|
||||
size := info.Size
|
||||
bytesUploaded := int64(0)
|
||||
optimalPartSize, err := store.calcOptimalPartSize(size)
|
||||
if err != nil {
|
||||
return bytesUploaded, err
|
||||
}
|
||||
|
||||
// Get number of parts to generate next number
|
||||
listPtr, err := store.Service.ListParts(&s3.ListPartsInput{
|
||||
Bucket: aws.String(store.Bucket),
|
||||
Key: aws.String(uploadId),
|
||||
UploadId: aws.String(multipartId),
|
||||
})
|
||||
parts, err := store.listAllParts(id)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
list := *listPtr
|
||||
numParts := len(list.Parts)
|
||||
numParts := len(parts)
|
||||
nextPartNum := int64(numParts + 1)
|
||||
|
||||
for {
|
||||
|
@ -248,7 +261,7 @@ func (store S3Store) WriteChunk(id string, offset int64, src io.Reader) (int64,
|
|||
defer os.Remove(file.Name())
|
||||
defer file.Close()
|
||||
|
||||
limitedReader := io.LimitReader(src, store.MaxPartSize)
|
||||
limitedReader := io.LimitReader(src, optimalPartSize)
|
||||
n, err := io.Copy(file, limitedReader)
|
||||
// io.Copy does not return io.EOF, so we not have to handle it differently.
|
||||
if err != nil {
|
||||
|
@ -259,11 +272,11 @@ func (store S3Store) WriteChunk(id string, offset int64, src io.Reader) (int64,
|
|||
return bytesUploaded, nil
|
||||
}
|
||||
|
||||
if (size - offset) <= store.MinPartSize {
|
||||
if (size - offset) <= optimalPartSize {
|
||||
if (size - offset) != n {
|
||||
return bytesUploaded, nil
|
||||
}
|
||||
} else if n < store.MinPartSize {
|
||||
} else if n < optimalPartSize {
|
||||
return bytesUploaded, nil
|
||||
}
|
||||
|
||||
|
@ -288,7 +301,7 @@ func (store S3Store) WriteChunk(id string, offset int64, src io.Reader) (int64,
|
|||
}
|
||||
|
||||
func (store S3Store) GetInfo(id string) (info tusd.FileInfo, err error) {
|
||||
uploadId, multipartId := splitIds(id)
|
||||
uploadId, _ := splitIds(id)
|
||||
|
||||
// Get file info stored in separate object
|
||||
res, err := store.Service.GetObject(&s3.GetObjectInput{
|
||||
|
@ -308,11 +321,7 @@ func (store S3Store) GetInfo(id string) (info tusd.FileInfo, err error) {
|
|||
}
|
||||
|
||||
// Get uploaded parts and their offset
|
||||
listPtr, err := store.Service.ListParts(&s3.ListPartsInput{
|
||||
Bucket: aws.String(store.Bucket),
|
||||
Key: aws.String(uploadId),
|
||||
UploadId: aws.String(multipartId),
|
||||
})
|
||||
parts, err := store.listAllParts(id)
|
||||
if err != nil {
|
||||
// Check if the error is caused by the upload not being found. This happens
|
||||
// when the multipart upload has already been completed or aborted. Since
|
||||
|
@ -326,11 +335,9 @@ func (store S3Store) GetInfo(id string) (info tusd.FileInfo, err error) {
|
|||
}
|
||||
}
|
||||
|
||||
list := *listPtr
|
||||
|
||||
offset := int64(0)
|
||||
|
||||
for _, part := range list.Parts {
|
||||
for _, part := range parts {
|
||||
offset += *part.Size
|
||||
}
|
||||
|
||||
|
@ -444,22 +451,17 @@ func (store S3Store) FinishUpload(id string) error {
|
|||
uploadId, multipartId := splitIds(id)
|
||||
|
||||
// Get uploaded parts
|
||||
listPtr, err := store.Service.ListParts(&s3.ListPartsInput{
|
||||
Bucket: aws.String(store.Bucket),
|
||||
Key: aws.String(uploadId),
|
||||
UploadId: aws.String(multipartId),
|
||||
})
|
||||
parts, err := store.listAllParts(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Transform the []*s3.Part slice to a []*s3.CompletedPart slice for the next
|
||||
// request.
|
||||
list := *listPtr
|
||||
parts := make([]*s3.CompletedPart, len(list.Parts))
|
||||
completedParts := make([]*s3.CompletedPart, len(parts))
|
||||
|
||||
for index, part := range list.Parts {
|
||||
parts[index] = &s3.CompletedPart{
|
||||
for index, part := range parts {
|
||||
completedParts[index] = &s3.CompletedPart{
|
||||
ETag: part.ETag,
|
||||
PartNumber: part.PartNumber,
|
||||
}
|
||||
|
@ -470,7 +472,7 @@ func (store S3Store) FinishUpload(id string) error {
|
|||
Key: aws.String(uploadId),
|
||||
UploadId: aws.String(multipartId),
|
||||
MultipartUpload: &s3.CompletedMultipartUpload{
|
||||
Parts: parts,
|
||||
Parts: completedParts,
|
||||
},
|
||||
})
|
||||
|
||||
|
@ -517,6 +519,33 @@ func (store S3Store) ConcatUploads(dest string, partialUploads []string) error {
|
|||
return store.FinishUpload(dest)
|
||||
}
|
||||
|
||||
func (store S3Store) listAllParts(id string) (parts []*s3.Part, err error) {
|
||||
uploadId, multipartId := splitIds(id)
|
||||
|
||||
partMarker := int64(0)
|
||||
for {
|
||||
// Get uploaded parts
|
||||
listPtr, err := store.Service.ListParts(&s3.ListPartsInput{
|
||||
Bucket: aws.String(store.Bucket),
|
||||
Key: aws.String(uploadId),
|
||||
UploadId: aws.String(multipartId),
|
||||
PartNumberMarker: aws.Int64(partMarker),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
parts = append(parts, (*listPtr).Parts...)
|
||||
|
||||
if listPtr.IsTruncated != nil && *listPtr.IsTruncated {
|
||||
partMarker = *listPtr.NextPartNumberMarker
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
return parts, nil
|
||||
}
|
||||
|
||||
func splitIds(id string) (uploadId, multipartId string) {
|
||||
index := strings.Index(id, "+")
|
||||
if index == -1 {
|
||||
|
@ -536,3 +565,47 @@ func isAwsError(err error, code string) bool {
|
|||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (store S3Store) calcOptimalPartSize(size int64) (optimalPartSize int64, err error) {
|
||||
switch {
|
||||
// When upload is smaller or equal MinPartSize, we upload in just one part.
|
||||
case size <= store.MinPartSize:
|
||||
optimalPartSize = store.MinPartSize
|
||||
// Does the upload fit in MaxMultipartParts parts or less with MinPartSize.
|
||||
case size <= store.MinPartSize*store.MaxMultipartParts:
|
||||
optimalPartSize = store.MinPartSize
|
||||
// Prerequisite: Be aware, that the result of an integer division (x/y) is
|
||||
// ALWAYS rounded DOWN, as there are no digits behind the comma.
|
||||
// In order to find out, whether we have an exact result or a rounded down
|
||||
// one, we can check, whether the remainder of that division is 0 (x%y == 0).
|
||||
//
|
||||
// So if the result of (size/MaxMultipartParts) is not a rounded down value,
|
||||
// then we can use it as our optimalPartSize. But if this division produces a
|
||||
// remainder, we have to round up the result by adding +1. Otherwise our
|
||||
// upload would not fit into MaxMultipartParts number of parts with that
|
||||
// size. We would need an additional part in order to upload everything.
|
||||
// While in almost all cases, we could skip the check for the remainder and
|
||||
// just add +1 to every result, but there is one case, where doing that would
|
||||
// doom our upload. When (MaxObjectSize == MaxPartSize * MaxMultipartParts),
|
||||
// by adding +1, we would end up with an optimalPartSize > MaxPartSize.
|
||||
// With the current S3 API specifications, we will not run into this problem,
|
||||
// but these specs are subject to change, and there are other stores as well,
|
||||
// which are implementing the S3 API (e.g. RIAK, Ceph RadosGW), but might
|
||||
// have different settings.
|
||||
case size%store.MaxMultipartParts == 0:
|
||||
optimalPartSize = size / store.MaxMultipartParts
|
||||
// Having a remainder larger than 0 means, the float result would have
|
||||
// digits after the comma (e.g. be something like 10.9). As a result, we can
|
||||
// only squeeze our upload into MaxMultipartParts parts, if we rounded UP
|
||||
// this division's result. That is what is happending here. We round up by
|
||||
// adding +1, if the prior test for (remainder == 0) did not succeed.
|
||||
default:
|
||||
optimalPartSize = size/store.MaxMultipartParts + 1
|
||||
}
|
||||
|
||||
// optimalPartSize must never exceed MaxPartSize
|
||||
if optimalPartSize > store.MaxPartSize {
|
||||
return optimalPartSize, fmt.Errorf("calcOptimalPartSize: to upload %v bytes optimalPartSize %v must exceed MaxPartSize %v", size, optimalPartSize, store.MaxPartSize)
|
||||
}
|
||||
return optimalPartSize, nil
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
// Automatically generated by MockGen. DO NOT EDIT!
|
||||
// Source: github.com/tus/tusd/s3store (interfaces: S3API)
|
||||
|
||||
package s3store_test
|
||||
package s3store
|
||||
|
||||
import (
|
||||
s3 "github.com/aws/aws-sdk-go/service/s3"
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
package s3store_test
|
||||
package s3store
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"testing"
|
||||
|
||||
|
@ -12,17 +13,16 @@ import (
|
|||
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
"github.com/tus/tusd"
|
||||
"github.com/tus/tusd/s3store"
|
||||
)
|
||||
|
||||
//go:generate mockgen -destination=./s3store_mock_test.go -package=s3store_test github.com/tus/tusd/s3store S3API
|
||||
//go:generate mockgen -destination=./s3store_mock_test.go -package=s3store github.com/tus/tusd/s3store S3API
|
||||
|
||||
// Test interface implementations
|
||||
var _ tusd.DataStore = s3store.S3Store{}
|
||||
var _ tusd.GetReaderDataStore = s3store.S3Store{}
|
||||
var _ tusd.TerminaterDataStore = s3store.S3Store{}
|
||||
var _ tusd.FinisherDataStore = s3store.S3Store{}
|
||||
var _ tusd.ConcaterDataStore = s3store.S3Store{}
|
||||
var _ tusd.DataStore = S3Store{}
|
||||
var _ tusd.GetReaderDataStore = S3Store{}
|
||||
var _ tusd.TerminaterDataStore = S3Store{}
|
||||
var _ tusd.FinisherDataStore = S3Store{}
|
||||
var _ tusd.ConcaterDataStore = S3Store{}
|
||||
|
||||
func TestNewUpload(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
|
@ -30,7 +30,7 @@ func TestNewUpload(t *testing.T) {
|
|||
assert := assert.New(t)
|
||||
|
||||
s3obj := NewMockS3API(mockCtrl)
|
||||
store := s3store.New("bucket", s3obj)
|
||||
store := New("bucket", s3obj)
|
||||
|
||||
assert.Equal("bucket", store.Bucket)
|
||||
assert.Equal(s3obj, store.Service)
|
||||
|
@ -71,13 +71,35 @@ func TestNewUpload(t *testing.T) {
|
|||
assert.Equal("uploadId+multipartId", id)
|
||||
}
|
||||
|
||||
func TestNewUploadLargerMaxObjectSize(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
assert := assert.New(t)
|
||||
|
||||
s3obj := NewMockS3API(mockCtrl)
|
||||
store := New("bucket", s3obj)
|
||||
|
||||
assert.Equal("bucket", store.Bucket)
|
||||
assert.Equal(s3obj, store.Service)
|
||||
|
||||
info := tusd.FileInfo{
|
||||
ID: "uploadId",
|
||||
Size: store.MaxObjectSize + 1,
|
||||
}
|
||||
|
||||
id, err := store.NewUpload(info)
|
||||
assert.NotNil(err)
|
||||
assert.EqualError(err, fmt.Sprintf("s3store: upload size of %v bytes exceeds MaxObjectSize of %v bytes", info.Size, store.MaxObjectSize))
|
||||
assert.Equal("", id)
|
||||
}
|
||||
|
||||
func TestGetInfoNotFound(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
assert := assert.New(t)
|
||||
|
||||
s3obj := NewMockS3API(mockCtrl)
|
||||
store := s3store.New("bucket", s3obj)
|
||||
store := New("bucket", s3obj)
|
||||
|
||||
s3obj.EXPECT().GetObject(&s3.GetObjectInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
|
@ -94,7 +116,7 @@ func TestGetInfo(t *testing.T) {
|
|||
assert := assert.New(t)
|
||||
|
||||
s3obj := NewMockS3API(mockCtrl)
|
||||
store := s3store.New("bucket", s3obj)
|
||||
store := New("bucket", s3obj)
|
||||
|
||||
gomock.InOrder(
|
||||
s3obj.EXPECT().GetObject(&s3.GetObjectInput{
|
||||
|
@ -107,6 +129,7 @@ func TestGetInfo(t *testing.T) {
|
|||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId"),
|
||||
UploadId: aws.String("multipartId"),
|
||||
PartNumberMarker: aws.Int64(0),
|
||||
}).Return(&s3.ListPartsOutput{
|
||||
Parts: []*s3.Part{
|
||||
{
|
||||
|
@ -116,13 +139,27 @@ func TestGetInfo(t *testing.T) {
|
|||
Size: aws.Int64(200),
|
||||
},
|
||||
},
|
||||
NextPartNumberMarker: aws.Int64(2),
|
||||
IsTruncated: aws.Bool(true),
|
||||
}, nil),
|
||||
s3obj.EXPECT().ListParts(&s3.ListPartsInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId"),
|
||||
UploadId: aws.String("multipartId"),
|
||||
PartNumberMarker: aws.Int64(2),
|
||||
}).Return(&s3.ListPartsOutput{
|
||||
Parts: []*s3.Part{
|
||||
{
|
||||
Size: aws.Int64(100),
|
||||
},
|
||||
},
|
||||
}, nil),
|
||||
)
|
||||
|
||||
info, err := store.GetInfo("uploadId+multipartId")
|
||||
assert.Nil(err)
|
||||
assert.Equal(int64(500), info.Size)
|
||||
assert.Equal(int64(300), info.Offset)
|
||||
assert.Equal(int64(400), info.Offset)
|
||||
assert.Equal("uploadId+multipartId", info.ID)
|
||||
assert.Equal("hello", info.MetaData["foo"])
|
||||
assert.Equal("menü", info.MetaData["bar"])
|
||||
|
@ -134,7 +171,7 @@ func TestGetInfoFinished(t *testing.T) {
|
|||
assert := assert.New(t)
|
||||
|
||||
s3obj := NewMockS3API(mockCtrl)
|
||||
store := s3store.New("bucket", s3obj)
|
||||
store := New("bucket", s3obj)
|
||||
|
||||
gomock.InOrder(
|
||||
s3obj.EXPECT().GetObject(&s3.GetObjectInput{
|
||||
|
@ -147,6 +184,7 @@ func TestGetInfoFinished(t *testing.T) {
|
|||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId"),
|
||||
UploadId: aws.String("multipartId"),
|
||||
PartNumberMarker: aws.Int64(0),
|
||||
}).Return(nil, awserr.New("NoSuchUpload", "The specified upload does not exist.", nil)),
|
||||
)
|
||||
|
||||
|
@ -162,7 +200,7 @@ func TestGetReader(t *testing.T) {
|
|||
assert := assert.New(t)
|
||||
|
||||
s3obj := NewMockS3API(mockCtrl)
|
||||
store := s3store.New("bucket", s3obj)
|
||||
store := New("bucket", s3obj)
|
||||
|
||||
s3obj.EXPECT().GetObject(&s3.GetObjectInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
|
@ -182,7 +220,7 @@ func TestGetReaderNotFound(t *testing.T) {
|
|||
assert := assert.New(t)
|
||||
|
||||
s3obj := NewMockS3API(mockCtrl)
|
||||
store := s3store.New("bucket", s3obj)
|
||||
store := New("bucket", s3obj)
|
||||
|
||||
gomock.InOrder(
|
||||
s3obj.EXPECT().GetObject(&s3.GetObjectInput{
|
||||
|
@ -208,7 +246,7 @@ func TestGetReaderNotFinished(t *testing.T) {
|
|||
assert := assert.New(t)
|
||||
|
||||
s3obj := NewMockS3API(mockCtrl)
|
||||
store := s3store.New("bucket", s3obj)
|
||||
store := New("bucket", s3obj)
|
||||
|
||||
gomock.InOrder(
|
||||
s3obj.EXPECT().GetObject(&s3.GetObjectInput{
|
||||
|
@ -236,13 +274,14 @@ func TestFinishUpload(t *testing.T) {
|
|||
assert := assert.New(t)
|
||||
|
||||
s3obj := NewMockS3API(mockCtrl)
|
||||
store := s3store.New("bucket", s3obj)
|
||||
store := New("bucket", s3obj)
|
||||
|
||||
gomock.InOrder(
|
||||
s3obj.EXPECT().ListParts(&s3.ListPartsInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId"),
|
||||
UploadId: aws.String("multipartId"),
|
||||
PartNumberMarker: aws.Int64(0),
|
||||
}).Return(&s3.ListPartsOutput{
|
||||
Parts: []*s3.Part{
|
||||
{
|
||||
|
@ -256,6 +295,22 @@ func TestFinishUpload(t *testing.T) {
|
|||
PartNumber: aws.Int64(2),
|
||||
},
|
||||
},
|
||||
NextPartNumberMarker: aws.Int64(2),
|
||||
IsTruncated: aws.Bool(true),
|
||||
}, nil),
|
||||
s3obj.EXPECT().ListParts(&s3.ListPartsInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId"),
|
||||
UploadId: aws.String("multipartId"),
|
||||
PartNumberMarker: aws.Int64(2),
|
||||
}).Return(&s3.ListPartsOutput{
|
||||
Parts: []*s3.Part{
|
||||
{
|
||||
Size: aws.Int64(100),
|
||||
ETag: aws.String("foobar"),
|
||||
PartNumber: aws.Int64(3),
|
||||
},
|
||||
},
|
||||
}, nil),
|
||||
s3obj.EXPECT().CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
|
@ -271,6 +326,10 @@ func TestFinishUpload(t *testing.T) {
|
|||
ETag: aws.String("bar"),
|
||||
PartNumber: aws.Int64(2),
|
||||
},
|
||||
{
|
||||
ETag: aws.String("foobar"),
|
||||
PartNumber: aws.Int64(3),
|
||||
},
|
||||
},
|
||||
},
|
||||
}).Return(nil, nil),
|
||||
|
@ -286,9 +345,11 @@ func TestWriteChunk(t *testing.T) {
|
|||
assert := assert.New(t)
|
||||
|
||||
s3obj := NewMockS3API(mockCtrl)
|
||||
store := s3store.New("bucket", s3obj)
|
||||
store.MaxPartSize = 4
|
||||
store.MinPartSize = 2
|
||||
store := New("bucket", s3obj)
|
||||
store.MaxPartSize = 8
|
||||
store.MinPartSize = 4
|
||||
store.MaxMultipartParts = 10000
|
||||
store.MaxObjectSize = 5 * 1024 * 1024 * 1024 * 1024
|
||||
|
||||
gomock.InOrder(
|
||||
s3obj.EXPECT().GetObject(&s3.GetObjectInput{
|
||||
|
@ -301,6 +362,7 @@ func TestWriteChunk(t *testing.T) {
|
|||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId"),
|
||||
UploadId: aws.String("multipartId"),
|
||||
PartNumberMarker: aws.Int64(0),
|
||||
}).Return(&s3.ListPartsOutput{
|
||||
Parts: []*s3.Part{
|
||||
{
|
||||
|
@ -315,6 +377,7 @@ func TestWriteChunk(t *testing.T) {
|
|||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId"),
|
||||
UploadId: aws.String("multipartId"),
|
||||
PartNumberMarker: aws.Int64(0),
|
||||
}).Return(&s3.ListPartsOutput{
|
||||
Parts: []*s3.Part{
|
||||
{
|
||||
|
@ -344,13 +407,15 @@ func TestWriteChunk(t *testing.T) {
|
|||
Key: aws.String("uploadId"),
|
||||
UploadId: aws.String("multipartId"),
|
||||
PartNumber: aws.Int64(5),
|
||||
Body: bytes.NewReader([]byte("90")),
|
||||
Body: bytes.NewReader([]byte("90AB")),
|
||||
})).Return(nil, nil),
|
||||
)
|
||||
|
||||
bytesRead, err := store.WriteChunk("uploadId+multipartId", 300, bytes.NewReader([]byte("1234567890")))
|
||||
// The last bytes "CD" will be ignored, as they are not the last bytes of the
|
||||
// upload (500 bytes total) and not of full part-size.
|
||||
bytesRead, err := store.WriteChunk("uploadId+multipartId", 300, bytes.NewReader([]byte("1234567890ABCD")))
|
||||
assert.Nil(err)
|
||||
assert.Equal(int64(10), bytesRead)
|
||||
assert.Equal(int64(12), bytesRead)
|
||||
}
|
||||
|
||||
func TestWriteChunkDropTooSmall(t *testing.T) {
|
||||
|
@ -359,7 +424,7 @@ func TestWriteChunkDropTooSmall(t *testing.T) {
|
|||
assert := assert.New(t)
|
||||
|
||||
s3obj := NewMockS3API(mockCtrl)
|
||||
store := s3store.New("bucket", s3obj)
|
||||
store := New("bucket", s3obj)
|
||||
|
||||
gomock.InOrder(
|
||||
s3obj.EXPECT().GetObject(&s3.GetObjectInput{
|
||||
|
@ -372,6 +437,7 @@ func TestWriteChunkDropTooSmall(t *testing.T) {
|
|||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId"),
|
||||
UploadId: aws.String("multipartId"),
|
||||
PartNumberMarker: aws.Int64(0),
|
||||
}).Return(&s3.ListPartsOutput{
|
||||
Parts: []*s3.Part{
|
||||
{
|
||||
|
@ -386,6 +452,7 @@ func TestWriteChunkDropTooSmall(t *testing.T) {
|
|||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId"),
|
||||
UploadId: aws.String("multipartId"),
|
||||
PartNumberMarker: aws.Int64(0),
|
||||
}).Return(&s3.ListPartsOutput{
|
||||
Parts: []*s3.Part{
|
||||
{
|
||||
|
@ -409,7 +476,7 @@ func TestWriteChunkAllowTooSmallLast(t *testing.T) {
|
|||
assert := assert.New(t)
|
||||
|
||||
s3obj := NewMockS3API(mockCtrl)
|
||||
store := s3store.New("bucket", s3obj)
|
||||
store := New("bucket", s3obj)
|
||||
store.MinPartSize = 20
|
||||
|
||||
gomock.InOrder(
|
||||
|
@ -423,6 +490,7 @@ func TestWriteChunkAllowTooSmallLast(t *testing.T) {
|
|||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId"),
|
||||
UploadId: aws.String("multipartId"),
|
||||
PartNumberMarker: aws.Int64(0),
|
||||
}).Return(&s3.ListPartsOutput{
|
||||
Parts: []*s3.Part{
|
||||
{
|
||||
|
@ -437,6 +505,7 @@ func TestWriteChunkAllowTooSmallLast(t *testing.T) {
|
|||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId"),
|
||||
UploadId: aws.String("multipartId"),
|
||||
PartNumberMarker: aws.Int64(0),
|
||||
}).Return(&s3.ListPartsOutput{
|
||||
Parts: []*s3.Part{
|
||||
{
|
||||
|
@ -470,7 +539,7 @@ func TestTerminate(t *testing.T) {
|
|||
assert := assert.New(t)
|
||||
|
||||
s3obj := NewMockS3API(mockCtrl)
|
||||
store := s3store.New("bucket", s3obj)
|
||||
store := New("bucket", s3obj)
|
||||
|
||||
// Order is not important in this situation.
|
||||
s3obj.EXPECT().AbortMultipartUpload(&s3.AbortMultipartUploadInput{
|
||||
|
@ -504,7 +573,7 @@ func TestTerminateWithErrors(t *testing.T) {
|
|||
assert := assert.New(t)
|
||||
|
||||
s3obj := NewMockS3API(mockCtrl)
|
||||
store := s3store.New("bucket", s3obj)
|
||||
store := New("bucket", s3obj)
|
||||
|
||||
// Order is not important in this situation.
|
||||
// NoSuchUpload errors should be ignored
|
||||
|
@ -547,7 +616,7 @@ func TestConcatUploads(t *testing.T) {
|
|||
assert := assert.New(t)
|
||||
|
||||
s3obj := NewMockS3API(mockCtrl)
|
||||
store := s3store.New("bucket", s3obj)
|
||||
store := New("bucket", s3obj)
|
||||
|
||||
s3obj.EXPECT().UploadPartCopy(&s3.UploadPartCopyInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
|
@ -579,6 +648,7 @@ func TestConcatUploads(t *testing.T) {
|
|||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId"),
|
||||
UploadId: aws.String("multipartId"),
|
||||
PartNumberMarker: aws.Int64(0),
|
||||
}).Return(&s3.ListPartsOutput{
|
||||
Parts: []*s3.Part{
|
||||
{
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
package s3store_test
|
||||
package s3store
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
|
Loading…
Reference in New Issue