From 33d12533e79db367bbd432ad912b84b0bdd4ce32 Mon Sep 17 00:00:00 2001 From: Adam Jensen Date: Sat, 5 Jan 2019 03:02:42 -0500 Subject: [PATCH] S3Store: Add support for creation-defer-length extension (#219) * Add HeadObject function to S3API * Regenerate S3API mock * Include incomplete part size in the offset * Add CRUD functions for managing incomplete parts * Account for incomplete parts in S3Store's Terminate * Account for incomplete parts in S3Store's WriteChunk * Factor out writeInfo function * Declare support for deferred length in S3Store * Add test for S3Store's DeclareLength * Adapt S3Store tests to new implementation * Add PutObjectInputMatcher test helper * Add test for prepending incomplete parts * Add GetInfo test for incomplete parts * Update S3Store docs * Consistently handle NoSuchKey errors from S3 * Handle both 403 and 404 responses from HeadObject If the IAM role doesn't have permission to list the contents of the bucket, then HEAD requests will return 403 for nonexistent objects. --- s3store/s3store.go | 189 +++++++++++++++++++----- s3store/s3store_mock_test.go | 11 ++ s3store/s3store_test.go | 276 ++++++++++++++++++++++++++++++++++- s3store/s3store_util_test.go | 47 ++++++ 4 files changed, 479 insertions(+), 44 deletions(-) diff --git a/s3store/s3store.go b/s3store/s3store.go index 1beab11..1f87714 100644 --- a/s3store/s3store.go +++ b/s3store/s3store.go @@ -53,21 +53,10 @@ // // In order to support tus' principle of resumable upload, S3's Multipart-Uploads // are internally used. -// For each incoming PATCH request (a call to WriteChunk), a new part is uploaded -// to S3. However, each part of a multipart upload, except the last one, must -// be 5MB or bigger. This introduces a problem, since in tus' perspective -// it's totally fine to upload just a few kilobytes in a single request. -// -// Therefore, a few special conditions have been implemented: -// -// Each PATCH request must contain a body of, at least, 5MB. If the size -// is smaller than this limit, the entire request will be dropped and not -// even passed to the storage server. If your server supports a different -// limit, you can adjust this value using S3Store.MinPartSize. // // When receiving a PATCH request, its body will be temporarily stored on disk. // This requirement has been made to ensure the minimum size of a single part -// and to allow the calculating of a checksum. Once the part has been uploaded +// and to allow the AWS SDK to calculate a checksum. Once the part has been uploaded // to S3, the temporary file will be removed immediately. Therefore, please // ensure that the server running this storage backend has enough disk space // available to hold these caches. @@ -145,6 +134,7 @@ type S3API interface { PutObject(input *s3.PutObjectInput) (*s3.PutObjectOutput, error) ListParts(input *s3.ListPartsInput) (*s3.ListPartsOutput, error) UploadPart(input *s3.UploadPartInput) (*s3.UploadPartOutput, error) + HeadObject(input *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) GetObject(input *s3.GetObjectInput) (*s3.GetObjectOutput, error) CreateMultipartUpload(input *s3.CreateMultipartUploadInput) (*s3.CreateMultipartUploadOutput, error) AbortMultipartUpload(input *s3.AbortMultipartUploadInput) (*s3.AbortMultipartUploadOutput, error) @@ -173,6 +163,7 @@ func (store S3Store) UseIn(composer *tusd.StoreComposer) { composer.UseFinisher(store) composer.UseGetReader(store) composer.UseConcater(store) + composer.UseLengthDeferrer(store) } func (store S3Store) NewUpload(info tusd.FileInfo) (id string, err error) { @@ -211,9 +202,18 @@ func (store S3Store) NewUpload(info tusd.FileInfo) (id string, err error) { id = uploadId + "+" + *res.UploadId info.ID = id + err = store.writeInfo(uploadId, info) + if err != nil { + return "", fmt.Errorf("s3store: unable to create info file:\n%s", err) + } + + return id, nil +} + +func (store S3Store) writeInfo(uploadId string, info tusd.FileInfo) error { infoJson, err := json.Marshal(info) if err != nil { - return "", err + return err } // Create object on S3 containing information about the file @@ -223,11 +223,8 @@ func (store S3Store) NewUpload(info tusd.FileInfo) (id string, err error) { Body: bytes.NewReader(infoJson), ContentLength: aws.Int64(int64(len(infoJson))), }) - if err != nil { - return "", fmt.Errorf("s3store: unable to create info file:\n%s", err) - } - return id, nil + return err } func (store S3Store) WriteChunk(id string, offset int64, src io.Reader) (int64, error) { @@ -243,7 +240,7 @@ func (store S3Store) WriteChunk(id string, offset int64, src io.Reader) (int64, bytesUploaded := int64(0) optimalPartSize, err := store.calcOptimalPartSize(size) if err != nil { - return bytesUploaded, err + return 0, err } // Get number of parts to generate next number @@ -255,6 +252,21 @@ func (store S3Store) WriteChunk(id string, offset int64, src io.Reader) (int64, numParts := len(parts) nextPartNum := int64(numParts + 1) + incompletePartFile, incompletePartSize, err := store.downloadIncompletePartForUpload(uploadId) + if err != nil { + return 0, err + } + if incompletePartFile != nil { + defer incompletePartFile.Close() + defer os.Remove(incompletePartFile.Name()) + + if err := store.deleteIncompletePartForUpload(uploadId); err != nil { + return 0, err + } + + src = io.MultiReader(incompletePartFile, src) + } + for { // Create a temporary file to store the part in it file, err := ioutil.TempFile("", "tusd-s3-tmp-") @@ -272,31 +284,32 @@ func (store S3Store) WriteChunk(id string, offset int64, src io.Reader) (int64, } // If io.Copy is finished reading, it will always return (0, nil). if n == 0 { - return bytesUploaded, nil - } - - if !info.SizeIsDeferred { - if (size - offset) <= optimalPartSize { - if (size - offset) != n { - return bytesUploaded, nil - } - } else if n < optimalPartSize { - return bytesUploaded, nil - } + return (bytesUploaded - incompletePartSize), nil } // Seek to the beginning of the file file.Seek(0, 0) - _, err = store.Service.UploadPart(&s3.UploadPartInput{ - Bucket: aws.String(store.Bucket), - Key: store.keyWithPrefix(uploadId), - UploadId: aws.String(multipartId), - PartNumber: aws.Int64(nextPartNum), - Body: file, - }) - if err != nil { - return bytesUploaded, err + isFinalChunk := !info.SizeIsDeferred && (size == (offset-incompletePartSize)+n) + if n >= store.MinPartSize || isFinalChunk { + _, err = store.Service.UploadPart(&s3.UploadPartInput{ + Bucket: aws.String(store.Bucket), + Key: store.keyWithPrefix(uploadId), + UploadId: aws.String(multipartId), + PartNumber: aws.Int64(nextPartNum), + Body: file, + }) + if err != nil { + return bytesUploaded, err + } + } else { + if err := store.putIncompletePartForUpload(uploadId, file); err != nil { + return bytesUploaded, err + } + + bytesUploaded += n + + return (bytesUploaded - incompletePartSize), nil } offset += n @@ -346,6 +359,21 @@ func (store S3Store) GetInfo(id string) (info tusd.FileInfo, err error) { offset += *part.Size } + headResult, err := store.Service.HeadObject(&s3.HeadObjectInput{ + Bucket: aws.String(store.Bucket), + Key: store.keyWithPrefix(uploadId + ".part"), + }) + if err != nil { + if !isAwsError(err, s3.ErrCodeNoSuchKey) && !isAwsError(err, "AccessDenied") { + return info, err + } + + err = nil + } + if headResult != nil && headResult.ContentLength != nil { + offset += *headResult.ContentLength + } + info.Offset = offset return @@ -415,7 +443,7 @@ func (store S3Store) Terminate(id string) error { go func() { defer wg.Done() - // Delete the info and content file + // Delete the info and content files res, err := store.Service.DeleteObjects(&s3.DeleteObjectsInput{ Bucket: aws.String(store.Bucket), Delete: &s3.Delete{ @@ -423,6 +451,9 @@ func (store S3Store) Terminate(id string) error { { Key: store.keyWithPrefix(uploadId), }, + { + Key: store.keyWithPrefix(uploadId + ".part"), + }, { Key: store.keyWithPrefix(uploadId + ".info"), }, @@ -524,6 +555,18 @@ func (store S3Store) ConcatUploads(dest string, partialUploads []string) error { return store.FinishUpload(dest) } +func (store S3Store) DeclareLength(id string, length int64) error { + uploadId, _ := splitIds(id) + info, err := store.GetInfo(id) + if err != nil { + return err + } + info.Size = length + info.SizeIsDeferred = false + + return store.writeInfo(uploadId, info) +} + func (store S3Store) listAllParts(id string) (parts []*s3.Part, err error) { uploadId, multipartId := splitIds(id) @@ -551,6 +594,74 @@ func (store S3Store) listAllParts(id string) (parts []*s3.Part, err error) { return parts, nil } +func (store S3Store) downloadIncompletePartForUpload(uploadId string) (*os.File, int64, error) { + incompleteUploadObject, err := store.getIncompletePartForUpload(uploadId) + if err != nil { + return nil, 0, err + } + if incompleteUploadObject == nil { + // We did not find an incomplete upload + return nil, 0, nil + } + defer incompleteUploadObject.Body.Close() + + partFile, err := ioutil.TempFile("", "tusd-s3-tmp-") + if err != nil { + return nil, 0, err + } + + n, err := io.Copy(partFile, incompleteUploadObject.Body) + if err != nil { + return nil, 0, err + } + if n < *incompleteUploadObject.ContentLength { + return nil, 0, errors.New("short read of incomplete upload") + } + + _, err = partFile.Seek(0, 0) + if err != nil { + return nil, 0, err + } + + return partFile, n, nil +} + +func (store S3Store) getIncompletePartForUpload(uploadId string) (*s3.GetObjectOutput, error) { + obj, err := store.Service.GetObject(&s3.GetObjectInput{ + Bucket: aws.String(store.Bucket), + Key: store.keyWithPrefix(uploadId + ".part"), + }) + + if err != nil && (isAwsError(err, s3.ErrCodeNoSuchKey) || isAwsError(err, "AccessDenied")) { + return nil, nil + } + + return obj, err +} + +func (store S3Store) putIncompletePartForUpload(uploadId string, r io.ReadSeeker) error { + _, err := store.Service.PutObject(&s3.PutObjectInput{ + Bucket: aws.String(store.Bucket), + Key: store.keyWithPrefix(uploadId + ".part"), + Body: r, + }) + return err +} + +func (store S3Store) deleteIncompletePartForUpload(uploadId string) error { + _, err := store.Service.DeleteObjects(&s3.DeleteObjectsInput{ + Bucket: aws.String(store.Bucket), + Delete: &s3.Delete{ + Objects: []*s3.ObjectIdentifier{ + { + Key: store.keyWithPrefix(uploadId + ".part"), + }, + }, + }, + }) + return err +} + func splitIds(id string) (uploadId, multipartId string) { index := strings.Index(id, "+") if index == -1 { diff --git a/s3store/s3store_mock_test.go b/s3store/s3store_mock_test.go index 0fc2125..705f251 100644 --- a/s3store/s3store_mock_test.go +++ b/s3store/s3store_mock_test.go @@ -84,6 +84,17 @@ func (_mr *_MockS3APIRecorder) GetObject(arg0 interface{}) *gomock.Call { return _mr.mock.ctrl.RecordCall(_mr.mock, "GetObject", arg0) } +func (_m *MockS3API) HeadObject(_param0 *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) { + ret := _m.ctrl.Call(_m, "HeadObject", _param0) + ret0, _ := ret[0].(*s3.HeadObjectOutput) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +func (_mr *_MockS3APIRecorder) HeadObject(arg0 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "HeadObject", arg0) +} + func (_m *MockS3API) ListParts(_param0 *s3.ListPartsInput) (*s3.ListPartsOutput, error) { ret := _m.ctrl.Call(_m, "ListParts", _param0) ret0, _ := ret[0].(*s3.ListPartsOutput) diff --git a/s3store/s3store_test.go b/s3store/s3store_test.go index 77deee5..57c7d22 100644 --- a/s3store/s3store_test.go +++ b/s3store/s3store_test.go @@ -202,6 +202,10 @@ func TestGetInfo(t *testing.T) { }, }, }, nil), + s3obj.EXPECT().HeadObject(&s3.HeadObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.HeadObjectOutput{}, awserr.New("NoSuchKey", "Not found", nil)), ) info, err := store.GetInfo("uploadId+multipartId") @@ -213,6 +217,41 @@ func TestGetInfo(t *testing.T) { assert.Equal("menĂ¼", info.MetaData["bar"]) } +func TestGetInfoWithIncompletePart(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + assert := assert.New(t) + + s3obj := NewMockS3API(mockCtrl) + store := New("bucket", s3obj) + + gomock.InOrder( + s3obj.EXPECT().GetObject(&s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.info"), + }).Return(&s3.GetObjectOutput{ + Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":500,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null}`))), + }, nil), + s3obj.EXPECT().ListParts(&s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), + }).Return(&s3.ListPartsOutput{Parts: []*s3.Part{}}, nil), + s3obj.EXPECT().HeadObject(&s3.HeadObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.HeadObjectOutput{ + ContentLength: aws.Int64(10), + }, nil), + ) + + info, err := store.GetInfo("uploadId+multipartId") + assert.Nil(err) + assert.Equal(int64(10), info.Offset) + assert.Equal("uploadId+multipartId", info.ID) +} + func TestGetInfoFinished(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() @@ -316,6 +355,47 @@ func TestGetReaderNotFinished(t *testing.T) { assert.Equal("cannot stream non-finished upload", err.Error()) } +func TestDeclareLength(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + assert := assert.New(t) + + s3obj := NewMockS3API(mockCtrl) + store := New("bucket", s3obj) + + gomock.InOrder( + s3obj.EXPECT().GetObject(&s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.info"), + }).Return(&s3.GetObjectOutput{ + Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":0,"SizeIsDeferred":true,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null}`))), + }, nil), + s3obj.EXPECT().ListParts(&s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), + }).Return(&s3.ListPartsOutput{ + Parts: []*s3.Part{}, + }, nil), + s3obj.EXPECT().HeadObject(&s3.HeadObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.HeadObjectOutput{ + ContentLength: aws.Int64(0), + }, nil), + s3obj.EXPECT().PutObject(&s3.PutObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.info"), + Body: bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":500,"SizeIsDeferred":false,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null}`)), + ContentLength: aws.Int64(int64(144)), + }), + ) + + err := store.DeclareLength("uploadId+multipartId", 500) + assert.Nil(err) +} + func TestFinishUpload(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() @@ -421,6 +501,10 @@ func TestWriteChunk(t *testing.T) { }, }, }, nil), + s3obj.EXPECT().HeadObject(&s3.HeadObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.HeadObjectOutput{}, awserr.New("NoSuchKey", "Not found", nil)), s3obj.EXPECT().ListParts(&s3.ListPartsInput{ Bucket: aws.String("bucket"), Key: aws.String("uploadId"), @@ -436,6 +520,10 @@ func TestWriteChunk(t *testing.T) { }, }, }, nil), + s3obj.EXPECT().GetObject(&s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist.", nil)), s3obj.EXPECT().UploadPart(NewUploadPartInputMatcher(&s3.UploadPartInput{ Bucket: aws.String("bucket"), Key: aws.String("uploadId"), @@ -457,16 +545,19 @@ func TestWriteChunk(t *testing.T) { PartNumber: aws.Int64(5), Body: bytes.NewReader([]byte("90AB")), })).Return(nil, nil), + s3obj.EXPECT().PutObject(NewPutObjectInputMatcher(&s3.PutObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + Body: bytes.NewReader([]byte("CD")), + })).Return(nil, nil), ) - // The last bytes "CD" will be ignored, as they are not the last bytes of the - // upload (500 bytes total) and not of full part-size. bytesRead, err := store.WriteChunk("uploadId+multipartId", 300, bytes.NewReader([]byte("1234567890ABCD"))) assert.Nil(err) - assert.Equal(int64(12), bytesRead) + assert.Equal(int64(14), bytesRead) } -func TestWriteChunkDropTooSmall(t *testing.T) { +func TestWriteChunkWriteIncompletePartBecauseTooSmall(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() assert := assert.New(t) @@ -496,6 +587,10 @@ func TestWriteChunkDropTooSmall(t *testing.T) { }, }, }, nil), + s3obj.EXPECT().HeadObject(&s3.HeadObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.HeadObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist", nil)), s3obj.EXPECT().ListParts(&s3.ListPartsInput{ Bucket: aws.String("bucket"), Key: aws.String("uploadId"), @@ -511,11 +606,168 @@ func TestWriteChunkDropTooSmall(t *testing.T) { }, }, }, nil), + s3obj.EXPECT().GetObject(&s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist.", nil)), + s3obj.EXPECT().PutObject(NewPutObjectInputMatcher(&s3.PutObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + Body: bytes.NewReader([]byte("1234567890")), + })).Return(nil, nil), ) bytesRead, err := store.WriteChunk("uploadId+multipartId", 300, bytes.NewReader([]byte("1234567890"))) assert.Nil(err) - assert.Equal(int64(0), bytesRead) + assert.Equal(int64(10), bytesRead) +} + +func TestWriteChunkPrependsIncompletePart(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + assert := assert.New(t) + + s3obj := NewMockS3API(mockCtrl) + store := New("bucket", s3obj) + store.MaxPartSize = 8 + store.MinPartSize = 4 + store.MaxMultipartParts = 10000 + store.MaxObjectSize = 5 * 1024 * 1024 * 1024 * 1024 + + gomock.InOrder( + s3obj.EXPECT().GetObject(&s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.info"), + }).Return(&s3.GetObjectOutput{ + Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":5,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null}`))), + }, nil), + s3obj.EXPECT().ListParts(&s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), + }).Return(&s3.ListPartsOutput{Parts: []*s3.Part{}}, nil), + s3obj.EXPECT().HeadObject(&s3.HeadObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.HeadObjectOutput{ + ContentLength: aws.Int64(3), + }, nil), + s3obj.EXPECT().ListParts(&s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), + }).Return(&s3.ListPartsOutput{Parts: []*s3.Part{}}, nil), + s3obj.EXPECT().GetObject(&s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.GetObjectOutput{ + Body: ioutil.NopCloser(bytes.NewReader([]byte("123"))), + ContentLength: aws.Int64(3), + }, nil), + s3obj.EXPECT().DeleteObjects(&s3.DeleteObjectsInput{ + Bucket: aws.String(store.Bucket), + Delete: &s3.Delete{ + Objects: []*s3.ObjectIdentifier{ + { + Key: aws.String("uploadId.part"), + }, + }, + }, + }).Return(nil, nil), + s3obj.EXPECT().UploadPart(NewUploadPartInputMatcher(&s3.UploadPartInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumber: aws.Int64(1), + Body: bytes.NewReader([]byte("1234")), + })).Return(nil, nil), + s3obj.EXPECT().UploadPart(NewUploadPartInputMatcher(&s3.UploadPartInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumber: aws.Int64(2), + Body: bytes.NewReader([]byte("5")), + })).Return(nil, nil), + ) + + bytesRead, err := store.WriteChunk("uploadId+multipartId", 3, bytes.NewReader([]byte("45"))) + assert.Nil(err) + assert.Equal(int64(2), bytesRead) +} + +func TestWriteChunkPrependsIncompletePartAndWritesANewIncompletePart(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + assert := assert.New(t) + + s3obj := NewMockS3API(mockCtrl) + store := New("bucket", s3obj) + store.MaxPartSize = 8 + store.MinPartSize = 4 + store.MaxMultipartParts = 10000 + store.MaxObjectSize = 5 * 1024 * 1024 * 1024 * 1024 + + gomock.InOrder( + s3obj.EXPECT().GetObject(&s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.info"), + }).Return(&s3.GetObjectOutput{ + Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":10,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null}`))), + }, nil), + s3obj.EXPECT().ListParts(&s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), + }).Return(&s3.ListPartsOutput{Parts: []*s3.Part{}}, nil), + s3obj.EXPECT().HeadObject(&s3.HeadObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.HeadObjectOutput{ + ContentLength: aws.Int64(3), + }, nil), + s3obj.EXPECT().ListParts(&s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), + }).Return(&s3.ListPartsOutput{Parts: []*s3.Part{}}, nil), + s3obj.EXPECT().GetObject(&s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.GetObjectOutput{ + Body: ioutil.NopCloser(bytes.NewReader([]byte("123"))), + ContentLength: aws.Int64(3), + }, nil), + s3obj.EXPECT().DeleteObjects(&s3.DeleteObjectsInput{ + Bucket: aws.String(store.Bucket), + Delete: &s3.Delete{ + Objects: []*s3.ObjectIdentifier{ + { + Key: aws.String("uploadId.part"), + }, + }, + }, + }).Return(nil, nil), + s3obj.EXPECT().UploadPart(NewUploadPartInputMatcher(&s3.UploadPartInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumber: aws.Int64(1), + Body: bytes.NewReader([]byte("1234")), + })).Return(nil, nil), + s3obj.EXPECT().PutObject(NewPutObjectInputMatcher(&s3.PutObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + Body: bytes.NewReader([]byte("5")), + })).Return(nil, nil), + ) + + bytesRead, err := store.WriteChunk("uploadId+multipartId", 3, bytes.NewReader([]byte("45"))) + assert.Nil(err) + assert.Equal(int64(2), bytesRead) } func TestWriteChunkAllowTooSmallLast(t *testing.T) { @@ -549,6 +801,10 @@ func TestWriteChunkAllowTooSmallLast(t *testing.T) { }, }, }, nil), + s3obj.EXPECT().HeadObject(&s3.HeadObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.HeadObjectOutput{}, awserr.New("AccessDenied", "Access Denied.", nil)), s3obj.EXPECT().ListParts(&s3.ListPartsInput{ Bucket: aws.String("bucket"), Key: aws.String("uploadId"), @@ -564,6 +820,10 @@ func TestWriteChunkAllowTooSmallLast(t *testing.T) { }, }, }, nil), + s3obj.EXPECT().GetObject(&s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist.", nil)), s3obj.EXPECT().UploadPart(NewUploadPartInputMatcher(&s3.UploadPartInput{ Bucket: aws.String("bucket"), Key: aws.String("uploadId"), @@ -603,6 +863,9 @@ func TestTerminate(t *testing.T) { { Key: aws.String("uploadId"), }, + { + Key: aws.String("uploadId.part"), + }, { Key: aws.String("uploadId.info"), }, @@ -638,6 +901,9 @@ func TestTerminateWithErrors(t *testing.T) { { Key: aws.String("uploadId"), }, + { + Key: aws.String("uploadId.part"), + }, { Key: aws.String("uploadId.info"), }, diff --git a/s3store/s3store_util_test.go b/s3store/s3store_util_test.go index 10553d6..9dbb083 100644 --- a/s3store/s3store_util_test.go +++ b/s3store/s3store_util_test.go @@ -55,3 +55,50 @@ func (m UploadPartInputMatcher) String() string { m.expect.Body.Seek(0, 0) return fmt.Sprintf("UploadPartInput(%d: %s)", *m.expect.PartNumber, body) } + +type PutObjectInputMatcher struct { + expect *s3.PutObjectInput +} + +func NewPutObjectInputMatcher(expect *s3.PutObjectInput) gomock.Matcher { + return PutObjectInputMatcher{ + expect: expect, + } +} + +func (m PutObjectInputMatcher) Matches(x interface{}) bool { + input, ok := x.(*s3.PutObjectInput) + if !ok { + return false + } + + inputBody := input.Body + expectBody := m.expect.Body + + i, err := ioutil.ReadAll(inputBody) + if err != nil { + panic(err) + } + inputBody.Seek(0, 0) + + e, err := ioutil.ReadAll(expectBody) + if err != nil { + panic(err) + } + m.expect.Body.Seek(0, 0) + + if !reflect.DeepEqual(e, i) { + return false + } + + input.Body = nil + m.expect.Body = nil + + return reflect.DeepEqual(m.expect, input) +} + +func (m PutObjectInputMatcher) String() string { + body, _ := ioutil.ReadAll(m.expect.Body) + m.expect.Body.Seek(0, 0) + return fmt.Sprintf(`PutObjectInput(Body: "%s")`, body) +}