diff --git a/s3store/s3store.go b/s3store/s3store.go index ca0cc4e..28e6a08 100644 --- a/s3store/s3store.go +++ b/s3store/s3store.go @@ -86,6 +86,7 @@ import ( "fmt" "io" "io/ioutil" + "math" "os" "regexp" "strings" @@ -128,6 +129,13 @@ type S3Store struct { // in bytes. This number needs to match with the underlying S3 backend or else // uploaded parts will be reject. AWS S3, for example, uses 5MB for this value. MinPartSize int64 + // MaxMultipartParts is the maximum number of parts an S3 multipart upload is + // allowed to have according to AWS S3 API specifications. + // See: http://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html + MaxMultipartParts int64 + // MaxObjectSize is the maximum size an S3 Object can have according to S3 + // API specifications. See link above. + MaxObjectSize int64 } type S3API interface { @@ -146,10 +154,12 @@ type S3API interface { // The MaxPartSize and MinPartSize properties are set to 6 and 5MB. func New(bucket string, service S3API) S3Store { return S3Store{ - Bucket: bucket, - Service: service, - MaxPartSize: 6 * 1024 * 1024, - MinPartSize: 5 * 1024 * 1024, + Bucket: bucket, + Service: service, + MaxPartSize: 5 * 1024 * 1024 * 1024, + MinPartSize: 5 * 1024 * 1024, + MaxMultipartParts: 10000, + MaxObjectSize: 5 * 1024 * 1024 * 1024 * 1024, } } @@ -224,6 +234,10 @@ func (store S3Store) WriteChunk(id string, offset int64, src io.Reader) (int64, size := info.Size bytesUploaded := int64(0) + optimalPartSize := store.CalcOptimalPartSize(&size) + if optimalPartSize == 0 { + return bytesUploaded, nil + } // Get number of parts to generate next number parts, err := store.listAllParts(id) @@ -243,7 +257,7 @@ func (store S3Store) WriteChunk(id string, offset int64, src io.Reader) (int64, defer os.Remove(file.Name()) defer file.Close() - limitedReader := io.LimitReader(src, store.MaxPartSize) + limitedReader := io.LimitReader(src, optimalPartSize) n, err := io.Copy(file, limitedReader) // io.Copy does not return io.EOF, so we not have to handle it differently. if err != nil { @@ -254,11 +268,11 @@ func (store S3Store) WriteChunk(id string, offset int64, src io.Reader) (int64, return bytesUploaded, nil } - if (size - offset) <= store.MinPartSize { + if (size - offset) <= optimalPartSize { if (size - offset) != n { return bytesUploaded, nil } - } else if n < store.MinPartSize { + } else if n < optimalPartSize { return bytesUploaded, nil } @@ -547,3 +561,28 @@ func isAwsError(err error, code string) bool { } return false } + +func (store S3Store) CalcOptimalPartSize(size *int64) int64 { + switch { + // We can only manage files up to MaxObjectSize, else we need to fail. + case *size > store.MaxObjectSize: + return 0 + // When upload is smaller or equal MinPartSize, we upload in just one part. + case *size <= store.MinPartSize: + return store.MinPartSize + // When we need 9999 parts or less with MinPartSize. + case *size/store.MinPartSize < store.MaxMultipartParts: + return store.MinPartSize + // When we can divide our upload into exactly MaxMultipartParts parts with + // no bytes leftover, we will not need an spare last part. + // Also, when MaxObjectSize is equal to MaxPartSize * MaxMultipartParts + // (which is not the case with the current AWS S3 API specification, but + // might be in the future or with other S3-aware stores), we need this in + // order for our Multipart-Upload to reach full MaxObjectSize. + case int64(math.Mod(float64(*size), float64(store.MaxMultipartParts))) == 0: + return *size / store.MaxMultipartParts + // In all other cases, we need a spare last piece for the remaining bytes. + default: + return *size / (store.MaxMultipartParts - 1) + } +} diff --git a/s3store/s3store_test.go b/s3store/s3store_test.go index b954b18..1ae264b 100644 --- a/s3store/s3store_test.go +++ b/s3store/s3store_test.go @@ -2,7 +2,9 @@ package s3store_test import ( "bytes" + "fmt" "io/ioutil" + "math" "testing" "github.com/golang/mock/gomock" @@ -24,6 +26,94 @@ var _ tusd.TerminaterDataStore = s3store.S3Store{} var _ tusd.FinisherDataStore = s3store.S3Store{} var _ tusd.ConcaterDataStore = s3store.S3Store{} +func TestCalcOptimalPartSize(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + assert := assert.New(t) + + s3obj := NewMockS3API(mockCtrl) + store := s3store.New("bucket", s3obj) + + assert.Equal("bucket", store.Bucket) + assert.Equal(s3obj, store.Service) + + var MinPartSize = store.MinPartSize + var MaxPartSize = store.MaxPartSize + var MaxMultipartParts = store.MaxMultipartParts + var MaxObjectSize = store.MaxObjectSize + // sanity check + if MaxObjectSize > MaxPartSize*MaxMultipartParts { + t.Errorf("%v parts with %v bytes each is %v bytes, which is less than MaxObjectSize=%v.\n", MaxMultipartParts, MaxPartSize, MaxMultipartParts*MaxPartSize, MaxObjectSize) + } + + var LimitedMaxPartSize = MaxObjectSize / (MaxMultipartParts - 1) + // the size of the last part, when upload has MaxObjectSize and we use + // LimitedMaxPartSize for uploading + var LastPartSize int64 = int64(math.Mod(float64(MaxObjectSize), float64(MaxMultipartParts-1))) + + var optimalPartSize, equalparts, lastpartsize int64 + var err string + + // some of these tests are actually duplicates, as they specify the same size + // in bytes - two ways to describe the same thing. That is wanted, in order + // to provide a full picture from any angle. + testcases := []int64{ + 1, + MinPartSize - 1, + MinPartSize, + MinPartSize + 1, + MinPartSize * 9999, + MinPartSize*10000 - 1, + MinPartSize * 10000, + MinPartSize*10000 + 1, + MinPartSize * 10001, + LimitedMaxPartSize*9999 - 1, + LimitedMaxPartSize * 9999, + LimitedMaxPartSize*9999 + 1, + LimitedMaxPartSize*9999 + LastPartSize - 1, + LimitedMaxPartSize*9999 + LastPartSize, + LimitedMaxPartSize*9999 + LastPartSize + 1, + MaxObjectSize - 1, + MaxObjectSize, + MaxObjectSize + 1, + MaxPartSize*9999 - 1, + MaxPartSize * 9999, + MaxPartSize*9999 + 1, + MaxPartSize*10000 - 1, + MaxPartSize * 10000, + MaxPartSize*10000 + 1, + } + + for index, size := range testcases { + optimalPartSize = store.CalcOptimalPartSize(&size) + if size > MaxObjectSize && optimalPartSize != 0 { + err += fmt.Sprintf("Testcase #%v: size=%v exceeds MaxObjectSize=%v but optimalPartSize is not 0\n", index, size, MaxObjectSize) + } + if optimalPartSize*(MaxMultipartParts-1) > MaxObjectSize { + err += fmt.Sprintf("Testcase #%v: optimalPartSize=%v, exceeds MaxPartSize=%v\n", index, optimalPartSize, MaxPartSize) + } + if optimalPartSize > MaxPartSize { + err += fmt.Sprintf("Testcase #%v: optimalPartSize=%v exceeds MaxPartSize=%v\n", index, optimalPartSize, MaxPartSize) + } + if optimalPartSize > 0 { + equalparts = size / optimalPartSize + lastpartsize = int64(math.Mod(float64(size), float64(optimalPartSize))) + if optimalPartSize < MinPartSize { + err += fmt.Sprintf("Testcase #%v: optimalPartSize=%v is below MinPartSize=%v\n", index, optimalPartSize, MinPartSize) + } + if equalparts > 10000 { + err += fmt.Sprintf("Testcase #%v: max-parts=%v exceeds limit of 10.000 parts\n", index, equalparts) + } + if equalparts == 10000 && lastpartsize > 0 { + err += fmt.Sprintf("Testcase #%v: max-parts=%v exceeds limit of 10.000 parts\n", index, equalparts+1) + } + } + if len(err) > 0 { + t.Errorf(err) + } + } +} + func TestNewUpload(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() @@ -324,8 +414,10 @@ func TestWriteChunk(t *testing.T) { s3obj := NewMockS3API(mockCtrl) store := s3store.New("bucket", s3obj) - store.MaxPartSize = 4 - store.MinPartSize = 2 + store.MaxPartSize = 8 + store.MinPartSize = 4 + store.MaxMultipartParts = 10000 + store.MaxObjectSize = 5 * 1024 * 1024 * 1024 * 1024 gomock.InOrder( s3obj.EXPECT().GetObject(&s3.GetObjectInput{ @@ -383,13 +475,15 @@ func TestWriteChunk(t *testing.T) { Key: aws.String("uploadId"), UploadId: aws.String("multipartId"), PartNumber: aws.Int64(5), - Body: bytes.NewReader([]byte("90")), + Body: bytes.NewReader([]byte("90AB")), })).Return(nil, nil), ) - bytesRead, err := store.WriteChunk("uploadId+multipartId", 300, bytes.NewReader([]byte("1234567890"))) + // The last bytes "CD" will be ignored, as they are not the last bytes of the + // upload (500 bytes total) and not of full part-size. + bytesRead, err := store.WriteChunk("uploadId+multipartId", 300, bytes.NewReader([]byte("1234567890ABCD"))) assert.Nil(err) - assert.Equal(int64(10), bytesRead) + assert.Equal(int64(12), bytesRead) } func TestWriteChunkDropTooSmall(t *testing.T) {