calculating PartSize based on size of upload
simplified algorithm, respect MaxObjectSize, updated tests, go fmt
This commit is contained in:
parent
1ad6187d6d
commit
381d3326cb
|
@ -86,6 +86,7 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"os"
|
||||
"regexp"
|
||||
"strings"
|
||||
|
@ -128,6 +129,13 @@ type S3Store struct {
|
|||
// in bytes. This number needs to match with the underlying S3 backend or else
|
||||
// uploaded parts will be reject. AWS S3, for example, uses 5MB for this value.
|
||||
MinPartSize int64
|
||||
// MaxMultipartParts is the maximum number of parts an S3 multipart upload is
|
||||
// allowed to have according to AWS S3 API specifications.
|
||||
// See: http://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html
|
||||
MaxMultipartParts int64
|
||||
// MaxObjectSize is the maximum size an S3 Object can have according to S3
|
||||
// API specifications. See link above.
|
||||
MaxObjectSize int64
|
||||
}
|
||||
|
||||
type S3API interface {
|
||||
|
@ -146,10 +154,12 @@ type S3API interface {
|
|||
// The MaxPartSize and MinPartSize properties are set to 6 and 5MB.
|
||||
func New(bucket string, service S3API) S3Store {
|
||||
return S3Store{
|
||||
Bucket: bucket,
|
||||
Service: service,
|
||||
MaxPartSize: 6 * 1024 * 1024,
|
||||
MinPartSize: 5 * 1024 * 1024,
|
||||
Bucket: bucket,
|
||||
Service: service,
|
||||
MaxPartSize: 5 * 1024 * 1024 * 1024,
|
||||
MinPartSize: 5 * 1024 * 1024,
|
||||
MaxMultipartParts: 10000,
|
||||
MaxObjectSize: 5 * 1024 * 1024 * 1024 * 1024,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -224,6 +234,10 @@ func (store S3Store) WriteChunk(id string, offset int64, src io.Reader) (int64,
|
|||
|
||||
size := info.Size
|
||||
bytesUploaded := int64(0)
|
||||
optimalPartSize := store.CalcOptimalPartSize(&size)
|
||||
if optimalPartSize == 0 {
|
||||
return bytesUploaded, nil
|
||||
}
|
||||
|
||||
// Get number of parts to generate next number
|
||||
parts, err := store.listAllParts(id)
|
||||
|
@ -243,7 +257,7 @@ func (store S3Store) WriteChunk(id string, offset int64, src io.Reader) (int64,
|
|||
defer os.Remove(file.Name())
|
||||
defer file.Close()
|
||||
|
||||
limitedReader := io.LimitReader(src, store.MaxPartSize)
|
||||
limitedReader := io.LimitReader(src, optimalPartSize)
|
||||
n, err := io.Copy(file, limitedReader)
|
||||
// io.Copy does not return io.EOF, so we not have to handle it differently.
|
||||
if err != nil {
|
||||
|
@ -254,11 +268,11 @@ func (store S3Store) WriteChunk(id string, offset int64, src io.Reader) (int64,
|
|||
return bytesUploaded, nil
|
||||
}
|
||||
|
||||
if (size - offset) <= store.MinPartSize {
|
||||
if (size - offset) <= optimalPartSize {
|
||||
if (size - offset) != n {
|
||||
return bytesUploaded, nil
|
||||
}
|
||||
} else if n < store.MinPartSize {
|
||||
} else if n < optimalPartSize {
|
||||
return bytesUploaded, nil
|
||||
}
|
||||
|
||||
|
@ -547,3 +561,28 @@ func isAwsError(err error, code string) bool {
|
|||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (store S3Store) CalcOptimalPartSize(size *int64) int64 {
|
||||
switch {
|
||||
// We can only manage files up to MaxObjectSize, else we need to fail.
|
||||
case *size > store.MaxObjectSize:
|
||||
return 0
|
||||
// When upload is smaller or equal MinPartSize, we upload in just one part.
|
||||
case *size <= store.MinPartSize:
|
||||
return store.MinPartSize
|
||||
// When we need 9999 parts or less with MinPartSize.
|
||||
case *size/store.MinPartSize < store.MaxMultipartParts:
|
||||
return store.MinPartSize
|
||||
// When we can divide our upload into exactly MaxMultipartParts parts with
|
||||
// no bytes leftover, we will not need an spare last part.
|
||||
// Also, when MaxObjectSize is equal to MaxPartSize * MaxMultipartParts
|
||||
// (which is not the case with the current AWS S3 API specification, but
|
||||
// might be in the future or with other S3-aware stores), we need this in
|
||||
// order for our Multipart-Upload to reach full MaxObjectSize.
|
||||
case int64(math.Mod(float64(*size), float64(store.MaxMultipartParts))) == 0:
|
||||
return *size / store.MaxMultipartParts
|
||||
// In all other cases, we need a spare last piece for the remaining bytes.
|
||||
default:
|
||||
return *size / (store.MaxMultipartParts - 1)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,7 +2,9 @@ package s3store_test
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"testing"
|
||||
|
||||
"github.com/golang/mock/gomock"
|
||||
|
@ -24,6 +26,94 @@ var _ tusd.TerminaterDataStore = s3store.S3Store{}
|
|||
var _ tusd.FinisherDataStore = s3store.S3Store{}
|
||||
var _ tusd.ConcaterDataStore = s3store.S3Store{}
|
||||
|
||||
func TestCalcOptimalPartSize(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
assert := assert.New(t)
|
||||
|
||||
s3obj := NewMockS3API(mockCtrl)
|
||||
store := s3store.New("bucket", s3obj)
|
||||
|
||||
assert.Equal("bucket", store.Bucket)
|
||||
assert.Equal(s3obj, store.Service)
|
||||
|
||||
var MinPartSize = store.MinPartSize
|
||||
var MaxPartSize = store.MaxPartSize
|
||||
var MaxMultipartParts = store.MaxMultipartParts
|
||||
var MaxObjectSize = store.MaxObjectSize
|
||||
// sanity check
|
||||
if MaxObjectSize > MaxPartSize*MaxMultipartParts {
|
||||
t.Errorf("%v parts with %v bytes each is %v bytes, which is less than MaxObjectSize=%v.\n", MaxMultipartParts, MaxPartSize, MaxMultipartParts*MaxPartSize, MaxObjectSize)
|
||||
}
|
||||
|
||||
var LimitedMaxPartSize = MaxObjectSize / (MaxMultipartParts - 1)
|
||||
// the size of the last part, when upload has MaxObjectSize and we use
|
||||
// LimitedMaxPartSize for uploading
|
||||
var LastPartSize int64 = int64(math.Mod(float64(MaxObjectSize), float64(MaxMultipartParts-1)))
|
||||
|
||||
var optimalPartSize, equalparts, lastpartsize int64
|
||||
var err string
|
||||
|
||||
// some of these tests are actually duplicates, as they specify the same size
|
||||
// in bytes - two ways to describe the same thing. That is wanted, in order
|
||||
// to provide a full picture from any angle.
|
||||
testcases := []int64{
|
||||
1,
|
||||
MinPartSize - 1,
|
||||
MinPartSize,
|
||||
MinPartSize + 1,
|
||||
MinPartSize * 9999,
|
||||
MinPartSize*10000 - 1,
|
||||
MinPartSize * 10000,
|
||||
MinPartSize*10000 + 1,
|
||||
MinPartSize * 10001,
|
||||
LimitedMaxPartSize*9999 - 1,
|
||||
LimitedMaxPartSize * 9999,
|
||||
LimitedMaxPartSize*9999 + 1,
|
||||
LimitedMaxPartSize*9999 + LastPartSize - 1,
|
||||
LimitedMaxPartSize*9999 + LastPartSize,
|
||||
LimitedMaxPartSize*9999 + LastPartSize + 1,
|
||||
MaxObjectSize - 1,
|
||||
MaxObjectSize,
|
||||
MaxObjectSize + 1,
|
||||
MaxPartSize*9999 - 1,
|
||||
MaxPartSize * 9999,
|
||||
MaxPartSize*9999 + 1,
|
||||
MaxPartSize*10000 - 1,
|
||||
MaxPartSize * 10000,
|
||||
MaxPartSize*10000 + 1,
|
||||
}
|
||||
|
||||
for index, size := range testcases {
|
||||
optimalPartSize = store.CalcOptimalPartSize(&size)
|
||||
if size > MaxObjectSize && optimalPartSize != 0 {
|
||||
err += fmt.Sprintf("Testcase #%v: size=%v exceeds MaxObjectSize=%v but optimalPartSize is not 0\n", index, size, MaxObjectSize)
|
||||
}
|
||||
if optimalPartSize*(MaxMultipartParts-1) > MaxObjectSize {
|
||||
err += fmt.Sprintf("Testcase #%v: optimalPartSize=%v, exceeds MaxPartSize=%v\n", index, optimalPartSize, MaxPartSize)
|
||||
}
|
||||
if optimalPartSize > MaxPartSize {
|
||||
err += fmt.Sprintf("Testcase #%v: optimalPartSize=%v exceeds MaxPartSize=%v\n", index, optimalPartSize, MaxPartSize)
|
||||
}
|
||||
if optimalPartSize > 0 {
|
||||
equalparts = size / optimalPartSize
|
||||
lastpartsize = int64(math.Mod(float64(size), float64(optimalPartSize)))
|
||||
if optimalPartSize < MinPartSize {
|
||||
err += fmt.Sprintf("Testcase #%v: optimalPartSize=%v is below MinPartSize=%v\n", index, optimalPartSize, MinPartSize)
|
||||
}
|
||||
if equalparts > 10000 {
|
||||
err += fmt.Sprintf("Testcase #%v: max-parts=%v exceeds limit of 10.000 parts\n", index, equalparts)
|
||||
}
|
||||
if equalparts == 10000 && lastpartsize > 0 {
|
||||
err += fmt.Sprintf("Testcase #%v: max-parts=%v exceeds limit of 10.000 parts\n", index, equalparts+1)
|
||||
}
|
||||
}
|
||||
if len(err) > 0 {
|
||||
t.Errorf(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewUpload(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
|
@ -324,8 +414,10 @@ func TestWriteChunk(t *testing.T) {
|
|||
|
||||
s3obj := NewMockS3API(mockCtrl)
|
||||
store := s3store.New("bucket", s3obj)
|
||||
store.MaxPartSize = 4
|
||||
store.MinPartSize = 2
|
||||
store.MaxPartSize = 8
|
||||
store.MinPartSize = 4
|
||||
store.MaxMultipartParts = 10000
|
||||
store.MaxObjectSize = 5 * 1024 * 1024 * 1024 * 1024
|
||||
|
||||
gomock.InOrder(
|
||||
s3obj.EXPECT().GetObject(&s3.GetObjectInput{
|
||||
|
@ -383,13 +475,15 @@ func TestWriteChunk(t *testing.T) {
|
|||
Key: aws.String("uploadId"),
|
||||
UploadId: aws.String("multipartId"),
|
||||
PartNumber: aws.Int64(5),
|
||||
Body: bytes.NewReader([]byte("90")),
|
||||
Body: bytes.NewReader([]byte("90AB")),
|
||||
})).Return(nil, nil),
|
||||
)
|
||||
|
||||
bytesRead, err := store.WriteChunk("uploadId+multipartId", 300, bytes.NewReader([]byte("1234567890")))
|
||||
// The last bytes "CD" will be ignored, as they are not the last bytes of the
|
||||
// upload (500 bytes total) and not of full part-size.
|
||||
bytesRead, err := store.WriteChunk("uploadId+multipartId", 300, bytes.NewReader([]byte("1234567890ABCD")))
|
||||
assert.Nil(err)
|
||||
assert.Equal(int64(10), bytesRead)
|
||||
assert.Equal(int64(12), bytesRead)
|
||||
}
|
||||
|
||||
func TestWriteChunkDropTooSmall(t *testing.T) {
|
||||
|
|
Loading…
Reference in New Issue