Take IsTruncated field of S3 ListParts API response into account (#148)

* Take IsTruncated field of S3 ListParts API response into account

* Rename s3store.ListParts to ListAllParts

* Use proper formatting + make listAllParts private + test listAllParts through TestGetInfo

* Update TestFinishUpload to also test paged ListParts response
This commit is contained in:
koenvo 2017-08-17 21:31:37 +02:00 committed by Marius
parent a51f5994bb
commit 1ad6187d6d
2 changed files with 112 additions and 57 deletions

View File

@ -226,17 +226,12 @@ func (store S3Store) WriteChunk(id string, offset int64, src io.Reader) (int64,
bytesUploaded := int64(0) bytesUploaded := int64(0)
// Get number of parts to generate next number // Get number of parts to generate next number
listPtr, err := store.Service.ListParts(&s3.ListPartsInput{ parts, err := store.listAllParts(id)
Bucket: aws.String(store.Bucket),
Key: aws.String(uploadId),
UploadId: aws.String(multipartId),
})
if err != nil { if err != nil {
return 0, err return 0, err
} }
list := *listPtr numParts := len(parts)
numParts := len(list.Parts)
nextPartNum := int64(numParts + 1) nextPartNum := int64(numParts + 1)
for { for {
@ -288,7 +283,7 @@ func (store S3Store) WriteChunk(id string, offset int64, src io.Reader) (int64,
} }
func (store S3Store) GetInfo(id string) (info tusd.FileInfo, err error) { func (store S3Store) GetInfo(id string) (info tusd.FileInfo, err error) {
uploadId, multipartId := splitIds(id) uploadId, _ := splitIds(id)
// Get file info stored in separate object // Get file info stored in separate object
res, err := store.Service.GetObject(&s3.GetObjectInput{ res, err := store.Service.GetObject(&s3.GetObjectInput{
@ -308,11 +303,7 @@ func (store S3Store) GetInfo(id string) (info tusd.FileInfo, err error) {
} }
// Get uploaded parts and their offset // Get uploaded parts and their offset
listPtr, err := store.Service.ListParts(&s3.ListPartsInput{ parts, err := store.listAllParts(id)
Bucket: aws.String(store.Bucket),
Key: aws.String(uploadId),
UploadId: aws.String(multipartId),
})
if err != nil { if err != nil {
// Check if the error is caused by the upload not being found. This happens // Check if the error is caused by the upload not being found. This happens
// when the multipart upload has already been completed or aborted. Since // when the multipart upload has already been completed or aborted. Since
@ -326,11 +317,9 @@ func (store S3Store) GetInfo(id string) (info tusd.FileInfo, err error) {
} }
} }
list := *listPtr
offset := int64(0) offset := int64(0)
for _, part := range list.Parts { for _, part := range parts {
offset += *part.Size offset += *part.Size
} }
@ -444,22 +433,17 @@ func (store S3Store) FinishUpload(id string) error {
uploadId, multipartId := splitIds(id) uploadId, multipartId := splitIds(id)
// Get uploaded parts // Get uploaded parts
listPtr, err := store.Service.ListParts(&s3.ListPartsInput{ parts, err := store.listAllParts(id)
Bucket: aws.String(store.Bucket),
Key: aws.String(uploadId),
UploadId: aws.String(multipartId),
})
if err != nil { if err != nil {
return err return err
} }
// Transform the []*s3.Part slice to a []*s3.CompletedPart slice for the next // Transform the []*s3.Part slice to a []*s3.CompletedPart slice for the next
// request. // request.
list := *listPtr completedParts := make([]*s3.CompletedPart, len(parts))
parts := make([]*s3.CompletedPart, len(list.Parts))
for index, part := range list.Parts { for index, part := range parts {
parts[index] = &s3.CompletedPart{ completedParts[index] = &s3.CompletedPart{
ETag: part.ETag, ETag: part.ETag,
PartNumber: part.PartNumber, PartNumber: part.PartNumber,
} }
@ -470,7 +454,7 @@ func (store S3Store) FinishUpload(id string) error {
Key: aws.String(uploadId), Key: aws.String(uploadId),
UploadId: aws.String(multipartId), UploadId: aws.String(multipartId),
MultipartUpload: &s3.CompletedMultipartUpload{ MultipartUpload: &s3.CompletedMultipartUpload{
Parts: parts, Parts: completedParts,
}, },
}) })
@ -517,6 +501,33 @@ func (store S3Store) ConcatUploads(dest string, partialUploads []string) error {
return store.FinishUpload(dest) return store.FinishUpload(dest)
} }
func (store S3Store) listAllParts(id string) (parts []*s3.Part, err error) {
uploadId, multipartId := splitIds(id)
partMarker := int64(0)
for {
// Get uploaded parts
listPtr, err := store.Service.ListParts(&s3.ListPartsInput{
Bucket: aws.String(store.Bucket),
Key: aws.String(uploadId),
UploadId: aws.String(multipartId),
PartNumberMarker: aws.Int64(partMarker),
})
if err != nil {
return nil, err
}
parts = append(parts, (*listPtr).Parts...)
if listPtr.IsTruncated != nil && *listPtr.IsTruncated {
partMarker = *listPtr.NextPartNumberMarker
} else {
break
}
}
return parts, nil
}
func splitIds(id string) (uploadId, multipartId string) { func splitIds(id string) (uploadId, multipartId string) {
index := strings.Index(id, "+") index := strings.Index(id, "+")
if index == -1 { if index == -1 {

View File

@ -104,9 +104,10 @@ func TestGetInfo(t *testing.T) {
Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":500,"Offset":0,"MetaData":{"bar":"menü","foo":"hello"},"IsPartial":false,"IsFinal":false,"PartialUploads":null}`))), Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":500,"Offset":0,"MetaData":{"bar":"menü","foo":"hello"},"IsPartial":false,"IsFinal":false,"PartialUploads":null}`))),
}, nil), }, nil),
s3obj.EXPECT().ListParts(&s3.ListPartsInput{ s3obj.EXPECT().ListParts(&s3.ListPartsInput{
Bucket: aws.String("bucket"), Bucket: aws.String("bucket"),
Key: aws.String("uploadId"), Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"), UploadId: aws.String("multipartId"),
PartNumberMarker: aws.Int64(0),
}).Return(&s3.ListPartsOutput{ }).Return(&s3.ListPartsOutput{
Parts: []*s3.Part{ Parts: []*s3.Part{
{ {
@ -116,13 +117,27 @@ func TestGetInfo(t *testing.T) {
Size: aws.Int64(200), Size: aws.Int64(200),
}, },
}, },
NextPartNumberMarker: aws.Int64(2),
IsTruncated: aws.Bool(true),
}, nil),
s3obj.EXPECT().ListParts(&s3.ListPartsInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
PartNumberMarker: aws.Int64(2),
}).Return(&s3.ListPartsOutput{
Parts: []*s3.Part{
{
Size: aws.Int64(100),
},
},
}, nil), }, nil),
) )
info, err := store.GetInfo("uploadId+multipartId") info, err := store.GetInfo("uploadId+multipartId")
assert.Nil(err) assert.Nil(err)
assert.Equal(int64(500), info.Size) assert.Equal(int64(500), info.Size)
assert.Equal(int64(300), info.Offset) assert.Equal(int64(400), info.Offset)
assert.Equal("uploadId+multipartId", info.ID) assert.Equal("uploadId+multipartId", info.ID)
assert.Equal("hello", info.MetaData["foo"]) assert.Equal("hello", info.MetaData["foo"])
assert.Equal("menü", info.MetaData["bar"]) assert.Equal("menü", info.MetaData["bar"])
@ -144,9 +159,10 @@ func TestGetInfoFinished(t *testing.T) {
Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null}`))), Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null}`))),
}, nil), }, nil),
s3obj.EXPECT().ListParts(&s3.ListPartsInput{ s3obj.EXPECT().ListParts(&s3.ListPartsInput{
Bucket: aws.String("bucket"), Bucket: aws.String("bucket"),
Key: aws.String("uploadId"), Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"), UploadId: aws.String("multipartId"),
PartNumberMarker: aws.Int64(0),
}).Return(nil, awserr.New("NoSuchUpload", "The specified upload does not exist.", nil)), }).Return(nil, awserr.New("NoSuchUpload", "The specified upload does not exist.", nil)),
) )
@ -240,9 +256,10 @@ func TestFinishUpload(t *testing.T) {
gomock.InOrder( gomock.InOrder(
s3obj.EXPECT().ListParts(&s3.ListPartsInput{ s3obj.EXPECT().ListParts(&s3.ListPartsInput{
Bucket: aws.String("bucket"), Bucket: aws.String("bucket"),
Key: aws.String("uploadId"), Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"), UploadId: aws.String("multipartId"),
PartNumberMarker: aws.Int64(0),
}).Return(&s3.ListPartsOutput{ }).Return(&s3.ListPartsOutput{
Parts: []*s3.Part{ Parts: []*s3.Part{
{ {
@ -256,6 +273,22 @@ func TestFinishUpload(t *testing.T) {
PartNumber: aws.Int64(2), PartNumber: aws.Int64(2),
}, },
}, },
NextPartNumberMarker: aws.Int64(2),
IsTruncated: aws.Bool(true),
}, nil),
s3obj.EXPECT().ListParts(&s3.ListPartsInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
PartNumberMarker: aws.Int64(2),
}).Return(&s3.ListPartsOutput{
Parts: []*s3.Part{
{
Size: aws.Int64(100),
ETag: aws.String("foobar"),
PartNumber: aws.Int64(3),
},
},
}, nil), }, nil),
s3obj.EXPECT().CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{ s3obj.EXPECT().CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{
Bucket: aws.String("bucket"), Bucket: aws.String("bucket"),
@ -271,6 +304,10 @@ func TestFinishUpload(t *testing.T) {
ETag: aws.String("bar"), ETag: aws.String("bar"),
PartNumber: aws.Int64(2), PartNumber: aws.Int64(2),
}, },
{
ETag: aws.String("foobar"),
PartNumber: aws.Int64(3),
},
}, },
}, },
}).Return(nil, nil), }).Return(nil, nil),
@ -298,9 +335,10 @@ func TestWriteChunk(t *testing.T) {
Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null}`))), Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null}`))),
}, nil), }, nil),
s3obj.EXPECT().ListParts(&s3.ListPartsInput{ s3obj.EXPECT().ListParts(&s3.ListPartsInput{
Bucket: aws.String("bucket"), Bucket: aws.String("bucket"),
Key: aws.String("uploadId"), Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"), UploadId: aws.String("multipartId"),
PartNumberMarker: aws.Int64(0),
}).Return(&s3.ListPartsOutput{ }).Return(&s3.ListPartsOutput{
Parts: []*s3.Part{ Parts: []*s3.Part{
{ {
@ -312,9 +350,10 @@ func TestWriteChunk(t *testing.T) {
}, },
}, nil), }, nil),
s3obj.EXPECT().ListParts(&s3.ListPartsInput{ s3obj.EXPECT().ListParts(&s3.ListPartsInput{
Bucket: aws.String("bucket"), Bucket: aws.String("bucket"),
Key: aws.String("uploadId"), Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"), UploadId: aws.String("multipartId"),
PartNumberMarker: aws.Int64(0),
}).Return(&s3.ListPartsOutput{ }).Return(&s3.ListPartsOutput{
Parts: []*s3.Part{ Parts: []*s3.Part{
{ {
@ -369,9 +408,10 @@ func TestWriteChunkDropTooSmall(t *testing.T) {
Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null}`))), Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null}`))),
}, nil), }, nil),
s3obj.EXPECT().ListParts(&s3.ListPartsInput{ s3obj.EXPECT().ListParts(&s3.ListPartsInput{
Bucket: aws.String("bucket"), Bucket: aws.String("bucket"),
Key: aws.String("uploadId"), Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"), UploadId: aws.String("multipartId"),
PartNumberMarker: aws.Int64(0),
}).Return(&s3.ListPartsOutput{ }).Return(&s3.ListPartsOutput{
Parts: []*s3.Part{ Parts: []*s3.Part{
{ {
@ -383,9 +423,10 @@ func TestWriteChunkDropTooSmall(t *testing.T) {
}, },
}, nil), }, nil),
s3obj.EXPECT().ListParts(&s3.ListPartsInput{ s3obj.EXPECT().ListParts(&s3.ListPartsInput{
Bucket: aws.String("bucket"), Bucket: aws.String("bucket"),
Key: aws.String("uploadId"), Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"), UploadId: aws.String("multipartId"),
PartNumberMarker: aws.Int64(0),
}).Return(&s3.ListPartsOutput{ }).Return(&s3.ListPartsOutput{
Parts: []*s3.Part{ Parts: []*s3.Part{
{ {
@ -420,9 +461,10 @@ func TestWriteChunkAllowTooSmallLast(t *testing.T) {
Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null}`))), Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null}`))),
}, nil), }, nil),
s3obj.EXPECT().ListParts(&s3.ListPartsInput{ s3obj.EXPECT().ListParts(&s3.ListPartsInput{
Bucket: aws.String("bucket"), Bucket: aws.String("bucket"),
Key: aws.String("uploadId"), Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"), UploadId: aws.String("multipartId"),
PartNumberMarker: aws.Int64(0),
}).Return(&s3.ListPartsOutput{ }).Return(&s3.ListPartsOutput{
Parts: []*s3.Part{ Parts: []*s3.Part{
{ {
@ -434,9 +476,10 @@ func TestWriteChunkAllowTooSmallLast(t *testing.T) {
}, },
}, nil), }, nil),
s3obj.EXPECT().ListParts(&s3.ListPartsInput{ s3obj.EXPECT().ListParts(&s3.ListPartsInput{
Bucket: aws.String("bucket"), Bucket: aws.String("bucket"),
Key: aws.String("uploadId"), Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"), UploadId: aws.String("multipartId"),
PartNumberMarker: aws.Int64(0),
}).Return(&s3.ListPartsOutput{ }).Return(&s3.ListPartsOutput{
Parts: []*s3.Part{ Parts: []*s3.Part{
{ {
@ -576,9 +619,10 @@ func TestConcatUploads(t *testing.T) {
// Output from s3Store.FinishUpload // Output from s3Store.FinishUpload
gomock.InOrder( gomock.InOrder(
s3obj.EXPECT().ListParts(&s3.ListPartsInput{ s3obj.EXPECT().ListParts(&s3.ListPartsInput{
Bucket: aws.String("bucket"), Bucket: aws.String("bucket"),
Key: aws.String("uploadId"), Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"), UploadId: aws.String("multipartId"),
PartNumberMarker: aws.Int64(0),
}).Return(&s3.ListPartsOutput{ }).Return(&s3.ListPartsOutput{
Parts: []*s3.Part{ Parts: []*s3.Part{
{ {