S3Store: Add support for creation-defer-length extension (#219)

* Add HeadObject function to S3API

* Regenerate S3API mock

* Include incomplete part size in the offset

* Add CRUD functions for managing incomplete parts

* Account for incomplete parts in S3Store's Terminate

* Account for incomplete parts in S3Store's WriteChunk

* Factor out writeInfo function

* Declare support for deferred length in S3Store

* Add test for S3Store's DeclareLength

* Adapt S3Store tests to new implementation

* Add PutObjectInputMatcher test helper

* Add test for prepending incomplete parts

* Add GetInfo test for incomplete parts

* Update S3Store docs

* Consistently handle NoSuchKey errors from S3

* Handle both 403 and 404 responses from HeadObject

If the IAM role doesn't have permission to list the contents of the bucket, then HEAD requests will return 403 for nonexistent objects.
This commit is contained in:
Adam Jensen 2019-01-05 03:02:42 -05:00 committed by Marius
parent 36526ef03d
commit 33d12533e7
4 changed files with 479 additions and 44 deletions

View File

@ -53,21 +53,10 @@
// //
// In order to support tus' principle of resumable upload, S3's Multipart-Uploads // In order to support tus' principle of resumable upload, S3's Multipart-Uploads
// are internally used. // are internally used.
// For each incoming PATCH request (a call to WriteChunk), a new part is uploaded
// to S3. However, each part of a multipart upload, except the last one, must
// be 5MB or bigger. This introduces a problem, since in tus' perspective
// it's totally fine to upload just a few kilobytes in a single request.
//
// Therefore, a few special conditions have been implemented:
//
// Each PATCH request must contain a body of, at least, 5MB. If the size
// is smaller than this limit, the entire request will be dropped and not
// even passed to the storage server. If your server supports a different
// limit, you can adjust this value using S3Store.MinPartSize.
// //
// When receiving a PATCH request, its body will be temporarily stored on disk. // When receiving a PATCH request, its body will be temporarily stored on disk.
// This requirement has been made to ensure the minimum size of a single part // This requirement has been made to ensure the minimum size of a single part
// and to allow the calculating of a checksum. Once the part has been uploaded // and to allow the AWS SDK to calculate a checksum. Once the part has been uploaded
// to S3, the temporary file will be removed immediately. Therefore, please // to S3, the temporary file will be removed immediately. Therefore, please
// ensure that the server running this storage backend has enough disk space // ensure that the server running this storage backend has enough disk space
// available to hold these caches. // available to hold these caches.
@ -145,6 +134,7 @@ type S3API interface {
PutObject(input *s3.PutObjectInput) (*s3.PutObjectOutput, error) PutObject(input *s3.PutObjectInput) (*s3.PutObjectOutput, error)
ListParts(input *s3.ListPartsInput) (*s3.ListPartsOutput, error) ListParts(input *s3.ListPartsInput) (*s3.ListPartsOutput, error)
UploadPart(input *s3.UploadPartInput) (*s3.UploadPartOutput, error) UploadPart(input *s3.UploadPartInput) (*s3.UploadPartOutput, error)
HeadObject(input *s3.HeadObjectInput) (*s3.HeadObjectOutput, error)
GetObject(input *s3.GetObjectInput) (*s3.GetObjectOutput, error) GetObject(input *s3.GetObjectInput) (*s3.GetObjectOutput, error)
CreateMultipartUpload(input *s3.CreateMultipartUploadInput) (*s3.CreateMultipartUploadOutput, error) CreateMultipartUpload(input *s3.CreateMultipartUploadInput) (*s3.CreateMultipartUploadOutput, error)
AbortMultipartUpload(input *s3.AbortMultipartUploadInput) (*s3.AbortMultipartUploadOutput, error) AbortMultipartUpload(input *s3.AbortMultipartUploadInput) (*s3.AbortMultipartUploadOutput, error)
@ -173,6 +163,7 @@ func (store S3Store) UseIn(composer *tusd.StoreComposer) {
composer.UseFinisher(store) composer.UseFinisher(store)
composer.UseGetReader(store) composer.UseGetReader(store)
composer.UseConcater(store) composer.UseConcater(store)
composer.UseLengthDeferrer(store)
} }
func (store S3Store) NewUpload(info tusd.FileInfo) (id string, err error) { func (store S3Store) NewUpload(info tusd.FileInfo) (id string, err error) {
@ -211,9 +202,18 @@ func (store S3Store) NewUpload(info tusd.FileInfo) (id string, err error) {
id = uploadId + "+" + *res.UploadId id = uploadId + "+" + *res.UploadId
info.ID = id info.ID = id
err = store.writeInfo(uploadId, info)
if err != nil {
return "", fmt.Errorf("s3store: unable to create info file:\n%s", err)
}
return id, nil
}
func (store S3Store) writeInfo(uploadId string, info tusd.FileInfo) error {
infoJson, err := json.Marshal(info) infoJson, err := json.Marshal(info)
if err != nil { if err != nil {
return "", err return err
} }
// Create object on S3 containing information about the file // Create object on S3 containing information about the file
@ -223,11 +223,8 @@ func (store S3Store) NewUpload(info tusd.FileInfo) (id string, err error) {
Body: bytes.NewReader(infoJson), Body: bytes.NewReader(infoJson),
ContentLength: aws.Int64(int64(len(infoJson))), ContentLength: aws.Int64(int64(len(infoJson))),
}) })
if err != nil {
return "", fmt.Errorf("s3store: unable to create info file:\n%s", err)
}
return id, nil return err
} }
func (store S3Store) WriteChunk(id string, offset int64, src io.Reader) (int64, error) { func (store S3Store) WriteChunk(id string, offset int64, src io.Reader) (int64, error) {
@ -243,7 +240,7 @@ func (store S3Store) WriteChunk(id string, offset int64, src io.Reader) (int64,
bytesUploaded := int64(0) bytesUploaded := int64(0)
optimalPartSize, err := store.calcOptimalPartSize(size) optimalPartSize, err := store.calcOptimalPartSize(size)
if err != nil { if err != nil {
return bytesUploaded, err return 0, err
} }
// Get number of parts to generate next number // Get number of parts to generate next number
@ -255,6 +252,21 @@ func (store S3Store) WriteChunk(id string, offset int64, src io.Reader) (int64,
numParts := len(parts) numParts := len(parts)
nextPartNum := int64(numParts + 1) nextPartNum := int64(numParts + 1)
incompletePartFile, incompletePartSize, err := store.downloadIncompletePartForUpload(uploadId)
if err != nil {
return 0, err
}
if incompletePartFile != nil {
defer incompletePartFile.Close()
defer os.Remove(incompletePartFile.Name())
if err := store.deleteIncompletePartForUpload(uploadId); err != nil {
return 0, err
}
src = io.MultiReader(incompletePartFile, src)
}
for { for {
// Create a temporary file to store the part in it // Create a temporary file to store the part in it
file, err := ioutil.TempFile("", "tusd-s3-tmp-") file, err := ioutil.TempFile("", "tusd-s3-tmp-")
@ -272,31 +284,32 @@ func (store S3Store) WriteChunk(id string, offset int64, src io.Reader) (int64,
} }
// If io.Copy is finished reading, it will always return (0, nil). // If io.Copy is finished reading, it will always return (0, nil).
if n == 0 { if n == 0 {
return bytesUploaded, nil return (bytesUploaded - incompletePartSize), nil
}
if !info.SizeIsDeferred {
if (size - offset) <= optimalPartSize {
if (size - offset) != n {
return bytesUploaded, nil
}
} else if n < optimalPartSize {
return bytesUploaded, nil
}
} }
// Seek to the beginning of the file // Seek to the beginning of the file
file.Seek(0, 0) file.Seek(0, 0)
_, err = store.Service.UploadPart(&s3.UploadPartInput{ isFinalChunk := !info.SizeIsDeferred && (size == (offset-incompletePartSize)+n)
Bucket: aws.String(store.Bucket), if n >= store.MinPartSize || isFinalChunk {
Key: store.keyWithPrefix(uploadId), _, err = store.Service.UploadPart(&s3.UploadPartInput{
UploadId: aws.String(multipartId), Bucket: aws.String(store.Bucket),
PartNumber: aws.Int64(nextPartNum), Key: store.keyWithPrefix(uploadId),
Body: file, UploadId: aws.String(multipartId),
}) PartNumber: aws.Int64(nextPartNum),
if err != nil { Body: file,
return bytesUploaded, err })
if err != nil {
return bytesUploaded, err
}
} else {
if err := store.putIncompletePartForUpload(uploadId, file); err != nil {
return bytesUploaded, err
}
bytesUploaded += n
return (bytesUploaded - incompletePartSize), nil
} }
offset += n offset += n
@ -346,6 +359,21 @@ func (store S3Store) GetInfo(id string) (info tusd.FileInfo, err error) {
offset += *part.Size offset += *part.Size
} }
headResult, err := store.Service.HeadObject(&s3.HeadObjectInput{
Bucket: aws.String(store.Bucket),
Key: store.keyWithPrefix(uploadId + ".part"),
})
if err != nil {
if !isAwsError(err, s3.ErrCodeNoSuchKey) && !isAwsError(err, "AccessDenied") {
return info, err
}
err = nil
}
if headResult != nil && headResult.ContentLength != nil {
offset += *headResult.ContentLength
}
info.Offset = offset info.Offset = offset
return return
@ -415,7 +443,7 @@ func (store S3Store) Terminate(id string) error {
go func() { go func() {
defer wg.Done() defer wg.Done()
// Delete the info and content file // Delete the info and content files
res, err := store.Service.DeleteObjects(&s3.DeleteObjectsInput{ res, err := store.Service.DeleteObjects(&s3.DeleteObjectsInput{
Bucket: aws.String(store.Bucket), Bucket: aws.String(store.Bucket),
Delete: &s3.Delete{ Delete: &s3.Delete{
@ -423,6 +451,9 @@ func (store S3Store) Terminate(id string) error {
{ {
Key: store.keyWithPrefix(uploadId), Key: store.keyWithPrefix(uploadId),
}, },
{
Key: store.keyWithPrefix(uploadId + ".part"),
},
{ {
Key: store.keyWithPrefix(uploadId + ".info"), Key: store.keyWithPrefix(uploadId + ".info"),
}, },
@ -524,6 +555,18 @@ func (store S3Store) ConcatUploads(dest string, partialUploads []string) error {
return store.FinishUpload(dest) return store.FinishUpload(dest)
} }
func (store S3Store) DeclareLength(id string, length int64) error {
uploadId, _ := splitIds(id)
info, err := store.GetInfo(id)
if err != nil {
return err
}
info.Size = length
info.SizeIsDeferred = false
return store.writeInfo(uploadId, info)
}
func (store S3Store) listAllParts(id string) (parts []*s3.Part, err error) { func (store S3Store) listAllParts(id string) (parts []*s3.Part, err error) {
uploadId, multipartId := splitIds(id) uploadId, multipartId := splitIds(id)
@ -551,6 +594,74 @@ func (store S3Store) listAllParts(id string) (parts []*s3.Part, err error) {
return parts, nil return parts, nil
} }
func (store S3Store) downloadIncompletePartForUpload(uploadId string) (*os.File, int64, error) {
incompleteUploadObject, err := store.getIncompletePartForUpload(uploadId)
if err != nil {
return nil, 0, err
}
if incompleteUploadObject == nil {
// We did not find an incomplete upload
return nil, 0, nil
}
defer incompleteUploadObject.Body.Close()
partFile, err := ioutil.TempFile("", "tusd-s3-tmp-")
if err != nil {
return nil, 0, err
}
n, err := io.Copy(partFile, incompleteUploadObject.Body)
if err != nil {
return nil, 0, err
}
if n < *incompleteUploadObject.ContentLength {
return nil, 0, errors.New("short read of incomplete upload")
}
_, err = partFile.Seek(0, 0)
if err != nil {
return nil, 0, err
}
return partFile, n, nil
}
func (store S3Store) getIncompletePartForUpload(uploadId string) (*s3.GetObjectOutput, error) {
obj, err := store.Service.GetObject(&s3.GetObjectInput{
Bucket: aws.String(store.Bucket),
Key: store.keyWithPrefix(uploadId + ".part"),
})
if err != nil && (isAwsError(err, s3.ErrCodeNoSuchKey) || isAwsError(err, "AccessDenied")) {
return nil, nil
}
return obj, err
}
func (store S3Store) putIncompletePartForUpload(uploadId string, r io.ReadSeeker) error {
_, err := store.Service.PutObject(&s3.PutObjectInput{
Bucket: aws.String(store.Bucket),
Key: store.keyWithPrefix(uploadId + ".part"),
Body: r,
})
return err
}
func (store S3Store) deleteIncompletePartForUpload(uploadId string) error {
_, err := store.Service.DeleteObjects(&s3.DeleteObjectsInput{
Bucket: aws.String(store.Bucket),
Delete: &s3.Delete{
Objects: []*s3.ObjectIdentifier{
{
Key: store.keyWithPrefix(uploadId + ".part"),
},
},
},
})
return err
}
func splitIds(id string) (uploadId, multipartId string) { func splitIds(id string) (uploadId, multipartId string) {
index := strings.Index(id, "+") index := strings.Index(id, "+")
if index == -1 { if index == -1 {

View File

@ -84,6 +84,17 @@ func (_mr *_MockS3APIRecorder) GetObject(arg0 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "GetObject", arg0) return _mr.mock.ctrl.RecordCall(_mr.mock, "GetObject", arg0)
} }
func (_m *MockS3API) HeadObject(_param0 *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) {
ret := _m.ctrl.Call(_m, "HeadObject", _param0)
ret0, _ := ret[0].(*s3.HeadObjectOutput)
ret1, _ := ret[1].(error)
return ret0, ret1
}
func (_mr *_MockS3APIRecorder) HeadObject(arg0 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "HeadObject", arg0)
}
func (_m *MockS3API) ListParts(_param0 *s3.ListPartsInput) (*s3.ListPartsOutput, error) { func (_m *MockS3API) ListParts(_param0 *s3.ListPartsInput) (*s3.ListPartsOutput, error) {
ret := _m.ctrl.Call(_m, "ListParts", _param0) ret := _m.ctrl.Call(_m, "ListParts", _param0)
ret0, _ := ret[0].(*s3.ListPartsOutput) ret0, _ := ret[0].(*s3.ListPartsOutput)

View File

@ -202,6 +202,10 @@ func TestGetInfo(t *testing.T) {
}, },
}, },
}, nil), }, nil),
s3obj.EXPECT().HeadObject(&s3.HeadObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
}).Return(&s3.HeadObjectOutput{}, awserr.New("NoSuchKey", "Not found", nil)),
) )
info, err := store.GetInfo("uploadId+multipartId") info, err := store.GetInfo("uploadId+multipartId")
@ -213,6 +217,41 @@ func TestGetInfo(t *testing.T) {
assert.Equal("menü", info.MetaData["bar"]) assert.Equal("menü", info.MetaData["bar"])
} }
func TestGetInfoWithIncompletePart(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
assert := assert.New(t)
s3obj := NewMockS3API(mockCtrl)
store := New("bucket", s3obj)
gomock.InOrder(
s3obj.EXPECT().GetObject(&s3.GetObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.info"),
}).Return(&s3.GetObjectOutput{
Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":500,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null}`))),
}, nil),
s3obj.EXPECT().ListParts(&s3.ListPartsInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
PartNumberMarker: aws.Int64(0),
}).Return(&s3.ListPartsOutput{Parts: []*s3.Part{}}, nil),
s3obj.EXPECT().HeadObject(&s3.HeadObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
}).Return(&s3.HeadObjectOutput{
ContentLength: aws.Int64(10),
}, nil),
)
info, err := store.GetInfo("uploadId+multipartId")
assert.Nil(err)
assert.Equal(int64(10), info.Offset)
assert.Equal("uploadId+multipartId", info.ID)
}
func TestGetInfoFinished(t *testing.T) { func TestGetInfoFinished(t *testing.T) {
mockCtrl := gomock.NewController(t) mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish() defer mockCtrl.Finish()
@ -316,6 +355,47 @@ func TestGetReaderNotFinished(t *testing.T) {
assert.Equal("cannot stream non-finished upload", err.Error()) assert.Equal("cannot stream non-finished upload", err.Error())
} }
func TestDeclareLength(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
assert := assert.New(t)
s3obj := NewMockS3API(mockCtrl)
store := New("bucket", s3obj)
gomock.InOrder(
s3obj.EXPECT().GetObject(&s3.GetObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.info"),
}).Return(&s3.GetObjectOutput{
Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":0,"SizeIsDeferred":true,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null}`))),
}, nil),
s3obj.EXPECT().ListParts(&s3.ListPartsInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
PartNumberMarker: aws.Int64(0),
}).Return(&s3.ListPartsOutput{
Parts: []*s3.Part{},
}, nil),
s3obj.EXPECT().HeadObject(&s3.HeadObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
}).Return(&s3.HeadObjectOutput{
ContentLength: aws.Int64(0),
}, nil),
s3obj.EXPECT().PutObject(&s3.PutObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.info"),
Body: bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":500,"SizeIsDeferred":false,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null}`)),
ContentLength: aws.Int64(int64(144)),
}),
)
err := store.DeclareLength("uploadId+multipartId", 500)
assert.Nil(err)
}
func TestFinishUpload(t *testing.T) { func TestFinishUpload(t *testing.T) {
mockCtrl := gomock.NewController(t) mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish() defer mockCtrl.Finish()
@ -421,6 +501,10 @@ func TestWriteChunk(t *testing.T) {
}, },
}, },
}, nil), }, nil),
s3obj.EXPECT().HeadObject(&s3.HeadObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
}).Return(&s3.HeadObjectOutput{}, awserr.New("NoSuchKey", "Not found", nil)),
s3obj.EXPECT().ListParts(&s3.ListPartsInput{ s3obj.EXPECT().ListParts(&s3.ListPartsInput{
Bucket: aws.String("bucket"), Bucket: aws.String("bucket"),
Key: aws.String("uploadId"), Key: aws.String("uploadId"),
@ -436,6 +520,10 @@ func TestWriteChunk(t *testing.T) {
}, },
}, },
}, nil), }, nil),
s3obj.EXPECT().GetObject(&s3.GetObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
}).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist.", nil)),
s3obj.EXPECT().UploadPart(NewUploadPartInputMatcher(&s3.UploadPartInput{ s3obj.EXPECT().UploadPart(NewUploadPartInputMatcher(&s3.UploadPartInput{
Bucket: aws.String("bucket"), Bucket: aws.String("bucket"),
Key: aws.String("uploadId"), Key: aws.String("uploadId"),
@ -457,16 +545,19 @@ func TestWriteChunk(t *testing.T) {
PartNumber: aws.Int64(5), PartNumber: aws.Int64(5),
Body: bytes.NewReader([]byte("90AB")), Body: bytes.NewReader([]byte("90AB")),
})).Return(nil, nil), })).Return(nil, nil),
s3obj.EXPECT().PutObject(NewPutObjectInputMatcher(&s3.PutObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
Body: bytes.NewReader([]byte("CD")),
})).Return(nil, nil),
) )
// The last bytes "CD" will be ignored, as they are not the last bytes of the
// upload (500 bytes total) and not of full part-size.
bytesRead, err := store.WriteChunk("uploadId+multipartId", 300, bytes.NewReader([]byte("1234567890ABCD"))) bytesRead, err := store.WriteChunk("uploadId+multipartId", 300, bytes.NewReader([]byte("1234567890ABCD")))
assert.Nil(err) assert.Nil(err)
assert.Equal(int64(12), bytesRead) assert.Equal(int64(14), bytesRead)
} }
func TestWriteChunkDropTooSmall(t *testing.T) { func TestWriteChunkWriteIncompletePartBecauseTooSmall(t *testing.T) {
mockCtrl := gomock.NewController(t) mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish() defer mockCtrl.Finish()
assert := assert.New(t) assert := assert.New(t)
@ -496,6 +587,10 @@ func TestWriteChunkDropTooSmall(t *testing.T) {
}, },
}, },
}, nil), }, nil),
s3obj.EXPECT().HeadObject(&s3.HeadObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
}).Return(&s3.HeadObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist", nil)),
s3obj.EXPECT().ListParts(&s3.ListPartsInput{ s3obj.EXPECT().ListParts(&s3.ListPartsInput{
Bucket: aws.String("bucket"), Bucket: aws.String("bucket"),
Key: aws.String("uploadId"), Key: aws.String("uploadId"),
@ -511,11 +606,168 @@ func TestWriteChunkDropTooSmall(t *testing.T) {
}, },
}, },
}, nil), }, nil),
s3obj.EXPECT().GetObject(&s3.GetObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
}).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist.", nil)),
s3obj.EXPECT().PutObject(NewPutObjectInputMatcher(&s3.PutObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
Body: bytes.NewReader([]byte("1234567890")),
})).Return(nil, nil),
) )
bytesRead, err := store.WriteChunk("uploadId+multipartId", 300, bytes.NewReader([]byte("1234567890"))) bytesRead, err := store.WriteChunk("uploadId+multipartId", 300, bytes.NewReader([]byte("1234567890")))
assert.Nil(err) assert.Nil(err)
assert.Equal(int64(0), bytesRead) assert.Equal(int64(10), bytesRead)
}
func TestWriteChunkPrependsIncompletePart(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
assert := assert.New(t)
s3obj := NewMockS3API(mockCtrl)
store := New("bucket", s3obj)
store.MaxPartSize = 8
store.MinPartSize = 4
store.MaxMultipartParts = 10000
store.MaxObjectSize = 5 * 1024 * 1024 * 1024 * 1024
gomock.InOrder(
s3obj.EXPECT().GetObject(&s3.GetObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.info"),
}).Return(&s3.GetObjectOutput{
Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":5,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null}`))),
}, nil),
s3obj.EXPECT().ListParts(&s3.ListPartsInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
PartNumberMarker: aws.Int64(0),
}).Return(&s3.ListPartsOutput{Parts: []*s3.Part{}}, nil),
s3obj.EXPECT().HeadObject(&s3.HeadObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
}).Return(&s3.HeadObjectOutput{
ContentLength: aws.Int64(3),
}, nil),
s3obj.EXPECT().ListParts(&s3.ListPartsInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
PartNumberMarker: aws.Int64(0),
}).Return(&s3.ListPartsOutput{Parts: []*s3.Part{}}, nil),
s3obj.EXPECT().GetObject(&s3.GetObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
}).Return(&s3.GetObjectOutput{
Body: ioutil.NopCloser(bytes.NewReader([]byte("123"))),
ContentLength: aws.Int64(3),
}, nil),
s3obj.EXPECT().DeleteObjects(&s3.DeleteObjectsInput{
Bucket: aws.String(store.Bucket),
Delete: &s3.Delete{
Objects: []*s3.ObjectIdentifier{
{
Key: aws.String("uploadId.part"),
},
},
},
}).Return(nil, nil),
s3obj.EXPECT().UploadPart(NewUploadPartInputMatcher(&s3.UploadPartInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
PartNumber: aws.Int64(1),
Body: bytes.NewReader([]byte("1234")),
})).Return(nil, nil),
s3obj.EXPECT().UploadPart(NewUploadPartInputMatcher(&s3.UploadPartInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
PartNumber: aws.Int64(2),
Body: bytes.NewReader([]byte("5")),
})).Return(nil, nil),
)
bytesRead, err := store.WriteChunk("uploadId+multipartId", 3, bytes.NewReader([]byte("45")))
assert.Nil(err)
assert.Equal(int64(2), bytesRead)
}
func TestWriteChunkPrependsIncompletePartAndWritesANewIncompletePart(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
assert := assert.New(t)
s3obj := NewMockS3API(mockCtrl)
store := New("bucket", s3obj)
store.MaxPartSize = 8
store.MinPartSize = 4
store.MaxMultipartParts = 10000
store.MaxObjectSize = 5 * 1024 * 1024 * 1024 * 1024
gomock.InOrder(
s3obj.EXPECT().GetObject(&s3.GetObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.info"),
}).Return(&s3.GetObjectOutput{
Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":10,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null}`))),
}, nil),
s3obj.EXPECT().ListParts(&s3.ListPartsInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
PartNumberMarker: aws.Int64(0),
}).Return(&s3.ListPartsOutput{Parts: []*s3.Part{}}, nil),
s3obj.EXPECT().HeadObject(&s3.HeadObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
}).Return(&s3.HeadObjectOutput{
ContentLength: aws.Int64(3),
}, nil),
s3obj.EXPECT().ListParts(&s3.ListPartsInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
PartNumberMarker: aws.Int64(0),
}).Return(&s3.ListPartsOutput{Parts: []*s3.Part{}}, nil),
s3obj.EXPECT().GetObject(&s3.GetObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
}).Return(&s3.GetObjectOutput{
Body: ioutil.NopCloser(bytes.NewReader([]byte("123"))),
ContentLength: aws.Int64(3),
}, nil),
s3obj.EXPECT().DeleteObjects(&s3.DeleteObjectsInput{
Bucket: aws.String(store.Bucket),
Delete: &s3.Delete{
Objects: []*s3.ObjectIdentifier{
{
Key: aws.String("uploadId.part"),
},
},
},
}).Return(nil, nil),
s3obj.EXPECT().UploadPart(NewUploadPartInputMatcher(&s3.UploadPartInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
PartNumber: aws.Int64(1),
Body: bytes.NewReader([]byte("1234")),
})).Return(nil, nil),
s3obj.EXPECT().PutObject(NewPutObjectInputMatcher(&s3.PutObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
Body: bytes.NewReader([]byte("5")),
})).Return(nil, nil),
)
bytesRead, err := store.WriteChunk("uploadId+multipartId", 3, bytes.NewReader([]byte("45")))
assert.Nil(err)
assert.Equal(int64(2), bytesRead)
} }
func TestWriteChunkAllowTooSmallLast(t *testing.T) { func TestWriteChunkAllowTooSmallLast(t *testing.T) {
@ -549,6 +801,10 @@ func TestWriteChunkAllowTooSmallLast(t *testing.T) {
}, },
}, },
}, nil), }, nil),
s3obj.EXPECT().HeadObject(&s3.HeadObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
}).Return(&s3.HeadObjectOutput{}, awserr.New("AccessDenied", "Access Denied.", nil)),
s3obj.EXPECT().ListParts(&s3.ListPartsInput{ s3obj.EXPECT().ListParts(&s3.ListPartsInput{
Bucket: aws.String("bucket"), Bucket: aws.String("bucket"),
Key: aws.String("uploadId"), Key: aws.String("uploadId"),
@ -564,6 +820,10 @@ func TestWriteChunkAllowTooSmallLast(t *testing.T) {
}, },
}, },
}, nil), }, nil),
s3obj.EXPECT().GetObject(&s3.GetObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
}).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist.", nil)),
s3obj.EXPECT().UploadPart(NewUploadPartInputMatcher(&s3.UploadPartInput{ s3obj.EXPECT().UploadPart(NewUploadPartInputMatcher(&s3.UploadPartInput{
Bucket: aws.String("bucket"), Bucket: aws.String("bucket"),
Key: aws.String("uploadId"), Key: aws.String("uploadId"),
@ -603,6 +863,9 @@ func TestTerminate(t *testing.T) {
{ {
Key: aws.String("uploadId"), Key: aws.String("uploadId"),
}, },
{
Key: aws.String("uploadId.part"),
},
{ {
Key: aws.String("uploadId.info"), Key: aws.String("uploadId.info"),
}, },
@ -638,6 +901,9 @@ func TestTerminateWithErrors(t *testing.T) {
{ {
Key: aws.String("uploadId"), Key: aws.String("uploadId"),
}, },
{
Key: aws.String("uploadId.part"),
},
{ {
Key: aws.String("uploadId.info"), Key: aws.String("uploadId.info"),
}, },

View File

@ -55,3 +55,50 @@ func (m UploadPartInputMatcher) String() string {
m.expect.Body.Seek(0, 0) m.expect.Body.Seek(0, 0)
return fmt.Sprintf("UploadPartInput(%d: %s)", *m.expect.PartNumber, body) return fmt.Sprintf("UploadPartInput(%d: %s)", *m.expect.PartNumber, body)
} }
type PutObjectInputMatcher struct {
expect *s3.PutObjectInput
}
func NewPutObjectInputMatcher(expect *s3.PutObjectInput) gomock.Matcher {
return PutObjectInputMatcher{
expect: expect,
}
}
func (m PutObjectInputMatcher) Matches(x interface{}) bool {
input, ok := x.(*s3.PutObjectInput)
if !ok {
return false
}
inputBody := input.Body
expectBody := m.expect.Body
i, err := ioutil.ReadAll(inputBody)
if err != nil {
panic(err)
}
inputBody.Seek(0, 0)
e, err := ioutil.ReadAll(expectBody)
if err != nil {
panic(err)
}
m.expect.Body.Seek(0, 0)
if !reflect.DeepEqual(e, i) {
return false
}
input.Body = nil
m.expect.Body = nil
return reflect.DeepEqual(m.expect, input)
}
func (m PutObjectInputMatcher) String() string {
body, _ := ioutil.ReadAll(m.expect.Body)
m.expect.Body.Seek(0, 0)
return fmt.Sprintf(`PutObjectInput(Body: "%s")`, body)
}