diff --git a/s3store/s3store.go b/s3store/s3store.go index 1beab11..1f87714 100644 --- a/s3store/s3store.go +++ b/s3store/s3store.go @@ -53,21 +53,10 @@ // // In order to support tus' principle of resumable upload, S3's Multipart-Uploads // are internally used. -// For each incoming PATCH request (a call to WriteChunk), a new part is uploaded -// to S3. However, each part of a multipart upload, except the last one, must -// be 5MB or bigger. This introduces a problem, since in tus' perspective -// it's totally fine to upload just a few kilobytes in a single request. -// -// Therefore, a few special conditions have been implemented: -// -// Each PATCH request must contain a body of, at least, 5MB. If the size -// is smaller than this limit, the entire request will be dropped and not -// even passed to the storage server. If your server supports a different -// limit, you can adjust this value using S3Store.MinPartSize. // // When receiving a PATCH request, its body will be temporarily stored on disk. // This requirement has been made to ensure the minimum size of a single part -// and to allow the calculating of a checksum. Once the part has been uploaded +// and to allow the AWS SDK to calculate a checksum. Once the part has been uploaded // to S3, the temporary file will be removed immediately. Therefore, please // ensure that the server running this storage backend has enough disk space // available to hold these caches. @@ -145,6 +134,7 @@ type S3API interface { PutObject(input *s3.PutObjectInput) (*s3.PutObjectOutput, error) ListParts(input *s3.ListPartsInput) (*s3.ListPartsOutput, error) UploadPart(input *s3.UploadPartInput) (*s3.UploadPartOutput, error) + HeadObject(input *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) GetObject(input *s3.GetObjectInput) (*s3.GetObjectOutput, error) CreateMultipartUpload(input *s3.CreateMultipartUploadInput) (*s3.CreateMultipartUploadOutput, error) AbortMultipartUpload(input *s3.AbortMultipartUploadInput) (*s3.AbortMultipartUploadOutput, error) @@ -173,6 +163,7 @@ func (store S3Store) UseIn(composer *tusd.StoreComposer) { composer.UseFinisher(store) composer.UseGetReader(store) composer.UseConcater(store) + composer.UseLengthDeferrer(store) } func (store S3Store) NewUpload(info tusd.FileInfo) (id string, err error) { @@ -211,9 +202,18 @@ func (store S3Store) NewUpload(info tusd.FileInfo) (id string, err error) { id = uploadId + "+" + *res.UploadId info.ID = id + err = store.writeInfo(uploadId, info) + if err != nil { + return "", fmt.Errorf("s3store: unable to create info file:\n%s", err) + } + + return id, nil +} + +func (store S3Store) writeInfo(uploadId string, info tusd.FileInfo) error { infoJson, err := json.Marshal(info) if err != nil { - return "", err + return err } // Create object on S3 containing information about the file @@ -223,11 +223,8 @@ func (store S3Store) NewUpload(info tusd.FileInfo) (id string, err error) { Body: bytes.NewReader(infoJson), ContentLength: aws.Int64(int64(len(infoJson))), }) - if err != nil { - return "", fmt.Errorf("s3store: unable to create info file:\n%s", err) - } - return id, nil + return err } func (store S3Store) WriteChunk(id string, offset int64, src io.Reader) (int64, error) { @@ -243,7 +240,7 @@ func (store S3Store) WriteChunk(id string, offset int64, src io.Reader) (int64, bytesUploaded := int64(0) optimalPartSize, err := store.calcOptimalPartSize(size) if err != nil { - return bytesUploaded, err + return 0, err } // Get number of parts to generate next number @@ -255,6 +252,21 @@ func (store S3Store) WriteChunk(id string, offset int64, src io.Reader) (int64, numParts := len(parts) nextPartNum := int64(numParts + 1) + incompletePartFile, incompletePartSize, err := store.downloadIncompletePartForUpload(uploadId) + if err != nil { + return 0, err + } + if incompletePartFile != nil { + defer incompletePartFile.Close() + defer os.Remove(incompletePartFile.Name()) + + if err := store.deleteIncompletePartForUpload(uploadId); err != nil { + return 0, err + } + + src = io.MultiReader(incompletePartFile, src) + } + for { // Create a temporary file to store the part in it file, err := ioutil.TempFile("", "tusd-s3-tmp-") @@ -272,31 +284,32 @@ func (store S3Store) WriteChunk(id string, offset int64, src io.Reader) (int64, } // If io.Copy is finished reading, it will always return (0, nil). if n == 0 { - return bytesUploaded, nil - } - - if !info.SizeIsDeferred { - if (size - offset) <= optimalPartSize { - if (size - offset) != n { - return bytesUploaded, nil - } - } else if n < optimalPartSize { - return bytesUploaded, nil - } + return (bytesUploaded - incompletePartSize), nil } // Seek to the beginning of the file file.Seek(0, 0) - _, err = store.Service.UploadPart(&s3.UploadPartInput{ - Bucket: aws.String(store.Bucket), - Key: store.keyWithPrefix(uploadId), - UploadId: aws.String(multipartId), - PartNumber: aws.Int64(nextPartNum), - Body: file, - }) - if err != nil { - return bytesUploaded, err + isFinalChunk := !info.SizeIsDeferred && (size == (offset-incompletePartSize)+n) + if n >= store.MinPartSize || isFinalChunk { + _, err = store.Service.UploadPart(&s3.UploadPartInput{ + Bucket: aws.String(store.Bucket), + Key: store.keyWithPrefix(uploadId), + UploadId: aws.String(multipartId), + PartNumber: aws.Int64(nextPartNum), + Body: file, + }) + if err != nil { + return bytesUploaded, err + } + } else { + if err := store.putIncompletePartForUpload(uploadId, file); err != nil { + return bytesUploaded, err + } + + bytesUploaded += n + + return (bytesUploaded - incompletePartSize), nil } offset += n @@ -346,6 +359,21 @@ func (store S3Store) GetInfo(id string) (info tusd.FileInfo, err error) { offset += *part.Size } + headResult, err := store.Service.HeadObject(&s3.HeadObjectInput{ + Bucket: aws.String(store.Bucket), + Key: store.keyWithPrefix(uploadId + ".part"), + }) + if err != nil { + if !isAwsError(err, s3.ErrCodeNoSuchKey) && !isAwsError(err, "AccessDenied") { + return info, err + } + + err = nil + } + if headResult != nil && headResult.ContentLength != nil { + offset += *headResult.ContentLength + } + info.Offset = offset return @@ -415,7 +443,7 @@ func (store S3Store) Terminate(id string) error { go func() { defer wg.Done() - // Delete the info and content file + // Delete the info and content files res, err := store.Service.DeleteObjects(&s3.DeleteObjectsInput{ Bucket: aws.String(store.Bucket), Delete: &s3.Delete{ @@ -423,6 +451,9 @@ func (store S3Store) Terminate(id string) error { { Key: store.keyWithPrefix(uploadId), }, + { + Key: store.keyWithPrefix(uploadId + ".part"), + }, { Key: store.keyWithPrefix(uploadId + ".info"), }, @@ -524,6 +555,18 @@ func (store S3Store) ConcatUploads(dest string, partialUploads []string) error { return store.FinishUpload(dest) } +func (store S3Store) DeclareLength(id string, length int64) error { + uploadId, _ := splitIds(id) + info, err := store.GetInfo(id) + if err != nil { + return err + } + info.Size = length + info.SizeIsDeferred = false + + return store.writeInfo(uploadId, info) +} + func (store S3Store) listAllParts(id string) (parts []*s3.Part, err error) { uploadId, multipartId := splitIds(id) @@ -551,6 +594,74 @@ func (store S3Store) listAllParts(id string) (parts []*s3.Part, err error) { return parts, nil } +func (store S3Store) downloadIncompletePartForUpload(uploadId string) (*os.File, int64, error) { + incompleteUploadObject, err := store.getIncompletePartForUpload(uploadId) + if err != nil { + return nil, 0, err + } + if incompleteUploadObject == nil { + // We did not find an incomplete upload + return nil, 0, nil + } + defer incompleteUploadObject.Body.Close() + + partFile, err := ioutil.TempFile("", "tusd-s3-tmp-") + if err != nil { + return nil, 0, err + } + + n, err := io.Copy(partFile, incompleteUploadObject.Body) + if err != nil { + return nil, 0, err + } + if n < *incompleteUploadObject.ContentLength { + return nil, 0, errors.New("short read of incomplete upload") + } + + _, err = partFile.Seek(0, 0) + if err != nil { + return nil, 0, err + } + + return partFile, n, nil +} + +func (store S3Store) getIncompletePartForUpload(uploadId string) (*s3.GetObjectOutput, error) { + obj, err := store.Service.GetObject(&s3.GetObjectInput{ + Bucket: aws.String(store.Bucket), + Key: store.keyWithPrefix(uploadId + ".part"), + }) + + if err != nil && (isAwsError(err, s3.ErrCodeNoSuchKey) || isAwsError(err, "AccessDenied")) { + return nil, nil + } + + return obj, err +} + +func (store S3Store) putIncompletePartForUpload(uploadId string, r io.ReadSeeker) error { + _, err := store.Service.PutObject(&s3.PutObjectInput{ + Bucket: aws.String(store.Bucket), + Key: store.keyWithPrefix(uploadId + ".part"), + Body: r, + }) + return err +} + +func (store S3Store) deleteIncompletePartForUpload(uploadId string) error { + _, err := store.Service.DeleteObjects(&s3.DeleteObjectsInput{ + Bucket: aws.String(store.Bucket), + Delete: &s3.Delete{ + Objects: []*s3.ObjectIdentifier{ + { + Key: store.keyWithPrefix(uploadId + ".part"), + }, + }, + }, + }) + return err +} + func splitIds(id string) (uploadId, multipartId string) { index := strings.Index(id, "+") if index == -1 { diff --git a/s3store/s3store_mock_test.go b/s3store/s3store_mock_test.go index 0fc2125..705f251 100644 --- a/s3store/s3store_mock_test.go +++ b/s3store/s3store_mock_test.go @@ -84,6 +84,17 @@ func (_mr *_MockS3APIRecorder) GetObject(arg0 interface{}) *gomock.Call { return _mr.mock.ctrl.RecordCall(_mr.mock, "GetObject", arg0) } +func (_m *MockS3API) HeadObject(_param0 *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) { + ret := _m.ctrl.Call(_m, "HeadObject", _param0) + ret0, _ := ret[0].(*s3.HeadObjectOutput) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +func (_mr *_MockS3APIRecorder) HeadObject(arg0 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "HeadObject", arg0) +} + func (_m *MockS3API) ListParts(_param0 *s3.ListPartsInput) (*s3.ListPartsOutput, error) { ret := _m.ctrl.Call(_m, "ListParts", _param0) ret0, _ := ret[0].(*s3.ListPartsOutput) diff --git a/s3store/s3store_test.go b/s3store/s3store_test.go index 77deee5..57c7d22 100644 --- a/s3store/s3store_test.go +++ b/s3store/s3store_test.go @@ -202,6 +202,10 @@ func TestGetInfo(t *testing.T) { }, }, }, nil), + s3obj.EXPECT().HeadObject(&s3.HeadObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.HeadObjectOutput{}, awserr.New("NoSuchKey", "Not found", nil)), ) info, err := store.GetInfo("uploadId+multipartId") @@ -213,6 +217,41 @@ func TestGetInfo(t *testing.T) { assert.Equal("menĂ¼", info.MetaData["bar"]) } +func TestGetInfoWithIncompletePart(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + assert := assert.New(t) + + s3obj := NewMockS3API(mockCtrl) + store := New("bucket", s3obj) + + gomock.InOrder( + s3obj.EXPECT().GetObject(&s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.info"), + }).Return(&s3.GetObjectOutput{ + Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":500,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null}`))), + }, nil), + s3obj.EXPECT().ListParts(&s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), + }).Return(&s3.ListPartsOutput{Parts: []*s3.Part{}}, nil), + s3obj.EXPECT().HeadObject(&s3.HeadObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.HeadObjectOutput{ + ContentLength: aws.Int64(10), + }, nil), + ) + + info, err := store.GetInfo("uploadId+multipartId") + assert.Nil(err) + assert.Equal(int64(10), info.Offset) + assert.Equal("uploadId+multipartId", info.ID) +} + func TestGetInfoFinished(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() @@ -316,6 +355,47 @@ func TestGetReaderNotFinished(t *testing.T) { assert.Equal("cannot stream non-finished upload", err.Error()) } +func TestDeclareLength(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + assert := assert.New(t) + + s3obj := NewMockS3API(mockCtrl) + store := New("bucket", s3obj) + + gomock.InOrder( + s3obj.EXPECT().GetObject(&s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.info"), + }).Return(&s3.GetObjectOutput{ + Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":0,"SizeIsDeferred":true,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null}`))), + }, nil), + s3obj.EXPECT().ListParts(&s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), + }).Return(&s3.ListPartsOutput{ + Parts: []*s3.Part{}, + }, nil), + s3obj.EXPECT().HeadObject(&s3.HeadObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.HeadObjectOutput{ + ContentLength: aws.Int64(0), + }, nil), + s3obj.EXPECT().PutObject(&s3.PutObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.info"), + Body: bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":500,"SizeIsDeferred":false,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null}`)), + ContentLength: aws.Int64(int64(144)), + }), + ) + + err := store.DeclareLength("uploadId+multipartId", 500) + assert.Nil(err) +} + func TestFinishUpload(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() @@ -421,6 +501,10 @@ func TestWriteChunk(t *testing.T) { }, }, }, nil), + s3obj.EXPECT().HeadObject(&s3.HeadObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.HeadObjectOutput{}, awserr.New("NoSuchKey", "Not found", nil)), s3obj.EXPECT().ListParts(&s3.ListPartsInput{ Bucket: aws.String("bucket"), Key: aws.String("uploadId"), @@ -436,6 +520,10 @@ func TestWriteChunk(t *testing.T) { }, }, }, nil), + s3obj.EXPECT().GetObject(&s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist.", nil)), s3obj.EXPECT().UploadPart(NewUploadPartInputMatcher(&s3.UploadPartInput{ Bucket: aws.String("bucket"), Key: aws.String("uploadId"), @@ -457,16 +545,19 @@ func TestWriteChunk(t *testing.T) { PartNumber: aws.Int64(5), Body: bytes.NewReader([]byte("90AB")), })).Return(nil, nil), + s3obj.EXPECT().PutObject(NewPutObjectInputMatcher(&s3.PutObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + Body: bytes.NewReader([]byte("CD")), + })).Return(nil, nil), ) - // The last bytes "CD" will be ignored, as they are not the last bytes of the - // upload (500 bytes total) and not of full part-size. bytesRead, err := store.WriteChunk("uploadId+multipartId", 300, bytes.NewReader([]byte("1234567890ABCD"))) assert.Nil(err) - assert.Equal(int64(12), bytesRead) + assert.Equal(int64(14), bytesRead) } -func TestWriteChunkDropTooSmall(t *testing.T) { +func TestWriteChunkWriteIncompletePartBecauseTooSmall(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() assert := assert.New(t) @@ -496,6 +587,10 @@ func TestWriteChunkDropTooSmall(t *testing.T) { }, }, }, nil), + s3obj.EXPECT().HeadObject(&s3.HeadObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.HeadObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist", nil)), s3obj.EXPECT().ListParts(&s3.ListPartsInput{ Bucket: aws.String("bucket"), Key: aws.String("uploadId"), @@ -511,11 +606,168 @@ func TestWriteChunkDropTooSmall(t *testing.T) { }, }, }, nil), + s3obj.EXPECT().GetObject(&s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist.", nil)), + s3obj.EXPECT().PutObject(NewPutObjectInputMatcher(&s3.PutObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + Body: bytes.NewReader([]byte("1234567890")), + })).Return(nil, nil), ) bytesRead, err := store.WriteChunk("uploadId+multipartId", 300, bytes.NewReader([]byte("1234567890"))) assert.Nil(err) - assert.Equal(int64(0), bytesRead) + assert.Equal(int64(10), bytesRead) +} + +func TestWriteChunkPrependsIncompletePart(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + assert := assert.New(t) + + s3obj := NewMockS3API(mockCtrl) + store := New("bucket", s3obj) + store.MaxPartSize = 8 + store.MinPartSize = 4 + store.MaxMultipartParts = 10000 + store.MaxObjectSize = 5 * 1024 * 1024 * 1024 * 1024 + + gomock.InOrder( + s3obj.EXPECT().GetObject(&s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.info"), + }).Return(&s3.GetObjectOutput{ + Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":5,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null}`))), + }, nil), + s3obj.EXPECT().ListParts(&s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), + }).Return(&s3.ListPartsOutput{Parts: []*s3.Part{}}, nil), + s3obj.EXPECT().HeadObject(&s3.HeadObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.HeadObjectOutput{ + ContentLength: aws.Int64(3), + }, nil), + s3obj.EXPECT().ListParts(&s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), + }).Return(&s3.ListPartsOutput{Parts: []*s3.Part{}}, nil), + s3obj.EXPECT().GetObject(&s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.GetObjectOutput{ + Body: ioutil.NopCloser(bytes.NewReader([]byte("123"))), + ContentLength: aws.Int64(3), + }, nil), + s3obj.EXPECT().DeleteObjects(&s3.DeleteObjectsInput{ + Bucket: aws.String(store.Bucket), + Delete: &s3.Delete{ + Objects: []*s3.ObjectIdentifier{ + { + Key: aws.String("uploadId.part"), + }, + }, + }, + }).Return(nil, nil), + s3obj.EXPECT().UploadPart(NewUploadPartInputMatcher(&s3.UploadPartInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumber: aws.Int64(1), + Body: bytes.NewReader([]byte("1234")), + })).Return(nil, nil), + s3obj.EXPECT().UploadPart(NewUploadPartInputMatcher(&s3.UploadPartInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumber: aws.Int64(2), + Body: bytes.NewReader([]byte("5")), + })).Return(nil, nil), + ) + + bytesRead, err := store.WriteChunk("uploadId+multipartId", 3, bytes.NewReader([]byte("45"))) + assert.Nil(err) + assert.Equal(int64(2), bytesRead) +} + +func TestWriteChunkPrependsIncompletePartAndWritesANewIncompletePart(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + assert := assert.New(t) + + s3obj := NewMockS3API(mockCtrl) + store := New("bucket", s3obj) + store.MaxPartSize = 8 + store.MinPartSize = 4 + store.MaxMultipartParts = 10000 + store.MaxObjectSize = 5 * 1024 * 1024 * 1024 * 1024 + + gomock.InOrder( + s3obj.EXPECT().GetObject(&s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.info"), + }).Return(&s3.GetObjectOutput{ + Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":10,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null}`))), + }, nil), + s3obj.EXPECT().ListParts(&s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), + }).Return(&s3.ListPartsOutput{Parts: []*s3.Part{}}, nil), + s3obj.EXPECT().HeadObject(&s3.HeadObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.HeadObjectOutput{ + ContentLength: aws.Int64(3), + }, nil), + s3obj.EXPECT().ListParts(&s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), + }).Return(&s3.ListPartsOutput{Parts: []*s3.Part{}}, nil), + s3obj.EXPECT().GetObject(&s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.GetObjectOutput{ + Body: ioutil.NopCloser(bytes.NewReader([]byte("123"))), + ContentLength: aws.Int64(3), + }, nil), + s3obj.EXPECT().DeleteObjects(&s3.DeleteObjectsInput{ + Bucket: aws.String(store.Bucket), + Delete: &s3.Delete{ + Objects: []*s3.ObjectIdentifier{ + { + Key: aws.String("uploadId.part"), + }, + }, + }, + }).Return(nil, nil), + s3obj.EXPECT().UploadPart(NewUploadPartInputMatcher(&s3.UploadPartInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumber: aws.Int64(1), + Body: bytes.NewReader([]byte("1234")), + })).Return(nil, nil), + s3obj.EXPECT().PutObject(NewPutObjectInputMatcher(&s3.PutObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + Body: bytes.NewReader([]byte("5")), + })).Return(nil, nil), + ) + + bytesRead, err := store.WriteChunk("uploadId+multipartId", 3, bytes.NewReader([]byte("45"))) + assert.Nil(err) + assert.Equal(int64(2), bytesRead) } func TestWriteChunkAllowTooSmallLast(t *testing.T) { @@ -549,6 +801,10 @@ func TestWriteChunkAllowTooSmallLast(t *testing.T) { }, }, }, nil), + s3obj.EXPECT().HeadObject(&s3.HeadObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.HeadObjectOutput{}, awserr.New("AccessDenied", "Access Denied.", nil)), s3obj.EXPECT().ListParts(&s3.ListPartsInput{ Bucket: aws.String("bucket"), Key: aws.String("uploadId"), @@ -564,6 +820,10 @@ func TestWriteChunkAllowTooSmallLast(t *testing.T) { }, }, }, nil), + s3obj.EXPECT().GetObject(&s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist.", nil)), s3obj.EXPECT().UploadPart(NewUploadPartInputMatcher(&s3.UploadPartInput{ Bucket: aws.String("bucket"), Key: aws.String("uploadId"), @@ -603,6 +863,9 @@ func TestTerminate(t *testing.T) { { Key: aws.String("uploadId"), }, + { + Key: aws.String("uploadId.part"), + }, { Key: aws.String("uploadId.info"), }, @@ -638,6 +901,9 @@ func TestTerminateWithErrors(t *testing.T) { { Key: aws.String("uploadId"), }, + { + Key: aws.String("uploadId.part"), + }, { Key: aws.String("uploadId.info"), }, diff --git a/s3store/s3store_util_test.go b/s3store/s3store_util_test.go index 10553d6..9dbb083 100644 --- a/s3store/s3store_util_test.go +++ b/s3store/s3store_util_test.go @@ -55,3 +55,50 @@ func (m UploadPartInputMatcher) String() string { m.expect.Body.Seek(0, 0) return fmt.Sprintf("UploadPartInput(%d: %s)", *m.expect.PartNumber, body) } + +type PutObjectInputMatcher struct { + expect *s3.PutObjectInput +} + +func NewPutObjectInputMatcher(expect *s3.PutObjectInput) gomock.Matcher { + return PutObjectInputMatcher{ + expect: expect, + } +} + +func (m PutObjectInputMatcher) Matches(x interface{}) bool { + input, ok := x.(*s3.PutObjectInput) + if !ok { + return false + } + + inputBody := input.Body + expectBody := m.expect.Body + + i, err := ioutil.ReadAll(inputBody) + if err != nil { + panic(err) + } + inputBody.Seek(0, 0) + + e, err := ioutil.ReadAll(expectBody) + if err != nil { + panic(err) + } + m.expect.Body.Seek(0, 0) + + if !reflect.DeepEqual(e, i) { + return false + } + + input.Body = nil + m.expect.Body = nil + + return reflect.DeepEqual(m.expect, input) +} + +func (m PutObjectInputMatcher) String() string { + body, _ := ioutil.ReadAll(m.expect.Body) + m.expect.Body.Seek(0, 0) + return fmt.Sprintf(`PutObjectInput(Body: "%s")`, body) +}