From 1ad6187d6dc199018605e3e3a7d9d4c7d2c37cf8 Mon Sep 17 00:00:00 2001 From: koenvo Date: Thu, 17 Aug 2017 21:31:37 +0200 Subject: [PATCH] Take IsTruncated field of S3 ListParts API response into account (#148) * Take IsTruncated field of S3 ListParts API response into account * Rename s3store.ListParts to ListAllParts * Use proper formatting + make listAllParts private + test listAllParts through TestGetInfo * Update TestFinishUpload to also test paged ListParts response --- s3store/s3store.go | 63 ++++++++++++++---------- s3store/s3store_test.go | 106 ++++++++++++++++++++++++++++------------ 2 files changed, 112 insertions(+), 57 deletions(-) diff --git a/s3store/s3store.go b/s3store/s3store.go index 69f5c1a..ca0cc4e 100644 --- a/s3store/s3store.go +++ b/s3store/s3store.go @@ -226,17 +226,12 @@ func (store S3Store) WriteChunk(id string, offset int64, src io.Reader) (int64, bytesUploaded := int64(0) // Get number of parts to generate next number - listPtr, err := store.Service.ListParts(&s3.ListPartsInput{ - Bucket: aws.String(store.Bucket), - Key: aws.String(uploadId), - UploadId: aws.String(multipartId), - }) + parts, err := store.listAllParts(id) if err != nil { return 0, err } - list := *listPtr - numParts := len(list.Parts) + numParts := len(parts) nextPartNum := int64(numParts + 1) for { @@ -288,7 +283,7 @@ func (store S3Store) WriteChunk(id string, offset int64, src io.Reader) (int64, } func (store S3Store) GetInfo(id string) (info tusd.FileInfo, err error) { - uploadId, multipartId := splitIds(id) + uploadId, _ := splitIds(id) // Get file info stored in separate object res, err := store.Service.GetObject(&s3.GetObjectInput{ @@ -308,11 +303,7 @@ func (store S3Store) GetInfo(id string) (info tusd.FileInfo, err error) { } // Get uploaded parts and their offset - listPtr, err := store.Service.ListParts(&s3.ListPartsInput{ - Bucket: aws.String(store.Bucket), - Key: aws.String(uploadId), - UploadId: aws.String(multipartId), - }) + parts, err := store.listAllParts(id) if err != nil { // Check if the error is caused by the upload not being found. This happens // when the multipart upload has already been completed or aborted. Since @@ -326,11 +317,9 @@ func (store S3Store) GetInfo(id string) (info tusd.FileInfo, err error) { } } - list := *listPtr - offset := int64(0) - for _, part := range list.Parts { + for _, part := range parts { offset += *part.Size } @@ -444,22 +433,17 @@ func (store S3Store) FinishUpload(id string) error { uploadId, multipartId := splitIds(id) // Get uploaded parts - listPtr, err := store.Service.ListParts(&s3.ListPartsInput{ - Bucket: aws.String(store.Bucket), - Key: aws.String(uploadId), - UploadId: aws.String(multipartId), - }) + parts, err := store.listAllParts(id) if err != nil { return err } // Transform the []*s3.Part slice to a []*s3.CompletedPart slice for the next // request. - list := *listPtr - parts := make([]*s3.CompletedPart, len(list.Parts)) + completedParts := make([]*s3.CompletedPart, len(parts)) - for index, part := range list.Parts { - parts[index] = &s3.CompletedPart{ + for index, part := range parts { + completedParts[index] = &s3.CompletedPart{ ETag: part.ETag, PartNumber: part.PartNumber, } @@ -470,7 +454,7 @@ func (store S3Store) FinishUpload(id string) error { Key: aws.String(uploadId), UploadId: aws.String(multipartId), MultipartUpload: &s3.CompletedMultipartUpload{ - Parts: parts, + Parts: completedParts, }, }) @@ -517,6 +501,33 @@ func (store S3Store) ConcatUploads(dest string, partialUploads []string) error { return store.FinishUpload(dest) } +func (store S3Store) listAllParts(id string) (parts []*s3.Part, err error) { + uploadId, multipartId := splitIds(id) + + partMarker := int64(0) + for { + // Get uploaded parts + listPtr, err := store.Service.ListParts(&s3.ListPartsInput{ + Bucket: aws.String(store.Bucket), + Key: aws.String(uploadId), + UploadId: aws.String(multipartId), + PartNumberMarker: aws.Int64(partMarker), + }) + if err != nil { + return nil, err + } + + parts = append(parts, (*listPtr).Parts...) + + if listPtr.IsTruncated != nil && *listPtr.IsTruncated { + partMarker = *listPtr.NextPartNumberMarker + } else { + break + } + } + return parts, nil +} + func splitIds(id string) (uploadId, multipartId string) { index := strings.Index(id, "+") if index == -1 { diff --git a/s3store/s3store_test.go b/s3store/s3store_test.go index 2debfa5..b954b18 100644 --- a/s3store/s3store_test.go +++ b/s3store/s3store_test.go @@ -104,9 +104,10 @@ func TestGetInfo(t *testing.T) { Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":500,"Offset":0,"MetaData":{"bar":"menĂ¼","foo":"hello"},"IsPartial":false,"IsFinal":false,"PartialUploads":null}`))), }, nil), s3obj.EXPECT().ListParts(&s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), }).Return(&s3.ListPartsOutput{ Parts: []*s3.Part{ { @@ -116,13 +117,27 @@ func TestGetInfo(t *testing.T) { Size: aws.Int64(200), }, }, + NextPartNumberMarker: aws.Int64(2), + IsTruncated: aws.Bool(true), + }, nil), + s3obj.EXPECT().ListParts(&s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(2), + }).Return(&s3.ListPartsOutput{ + Parts: []*s3.Part{ + { + Size: aws.Int64(100), + }, + }, }, nil), ) info, err := store.GetInfo("uploadId+multipartId") assert.Nil(err) assert.Equal(int64(500), info.Size) - assert.Equal(int64(300), info.Offset) + assert.Equal(int64(400), info.Offset) assert.Equal("uploadId+multipartId", info.ID) assert.Equal("hello", info.MetaData["foo"]) assert.Equal("menĂ¼", info.MetaData["bar"]) @@ -144,9 +159,10 @@ func TestGetInfoFinished(t *testing.T) { Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null}`))), }, nil), s3obj.EXPECT().ListParts(&s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), }).Return(nil, awserr.New("NoSuchUpload", "The specified upload does not exist.", nil)), ) @@ -240,9 +256,10 @@ func TestFinishUpload(t *testing.T) { gomock.InOrder( s3obj.EXPECT().ListParts(&s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), }).Return(&s3.ListPartsOutput{ Parts: []*s3.Part{ { @@ -256,6 +273,22 @@ func TestFinishUpload(t *testing.T) { PartNumber: aws.Int64(2), }, }, + NextPartNumberMarker: aws.Int64(2), + IsTruncated: aws.Bool(true), + }, nil), + s3obj.EXPECT().ListParts(&s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(2), + }).Return(&s3.ListPartsOutput{ + Parts: []*s3.Part{ + { + Size: aws.Int64(100), + ETag: aws.String("foobar"), + PartNumber: aws.Int64(3), + }, + }, }, nil), s3obj.EXPECT().CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{ Bucket: aws.String("bucket"), @@ -271,6 +304,10 @@ func TestFinishUpload(t *testing.T) { ETag: aws.String("bar"), PartNumber: aws.Int64(2), }, + { + ETag: aws.String("foobar"), + PartNumber: aws.Int64(3), + }, }, }, }).Return(nil, nil), @@ -298,9 +335,10 @@ func TestWriteChunk(t *testing.T) { Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null}`))), }, nil), s3obj.EXPECT().ListParts(&s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), }).Return(&s3.ListPartsOutput{ Parts: []*s3.Part{ { @@ -312,9 +350,10 @@ func TestWriteChunk(t *testing.T) { }, }, nil), s3obj.EXPECT().ListParts(&s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), }).Return(&s3.ListPartsOutput{ Parts: []*s3.Part{ { @@ -369,9 +408,10 @@ func TestWriteChunkDropTooSmall(t *testing.T) { Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null}`))), }, nil), s3obj.EXPECT().ListParts(&s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), }).Return(&s3.ListPartsOutput{ Parts: []*s3.Part{ { @@ -383,9 +423,10 @@ func TestWriteChunkDropTooSmall(t *testing.T) { }, }, nil), s3obj.EXPECT().ListParts(&s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), }).Return(&s3.ListPartsOutput{ Parts: []*s3.Part{ { @@ -420,9 +461,10 @@ func TestWriteChunkAllowTooSmallLast(t *testing.T) { Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null}`))), }, nil), s3obj.EXPECT().ListParts(&s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), }).Return(&s3.ListPartsOutput{ Parts: []*s3.Part{ { @@ -434,9 +476,10 @@ func TestWriteChunkAllowTooSmallLast(t *testing.T) { }, }, nil), s3obj.EXPECT().ListParts(&s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), }).Return(&s3.ListPartsOutput{ Parts: []*s3.Part{ { @@ -576,9 +619,10 @@ func TestConcatUploads(t *testing.T) { // Output from s3Store.FinishUpload gomock.InOrder( s3obj.EXPECT().ListParts(&s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), }).Return(&s3.ListPartsOutput{ Parts: []*s3.Part{ {