From 6662f43d01d786d510262bb0ce1b47ffcf78f25d Mon Sep 17 00:00:00 2001 From: Adam Jensen Date: Wed, 29 Jul 2020 09:24:46 -0400 Subject: [PATCH] s3Store: Concurrently write upload parts to S3 while reading from client (#402) * Allow empty metadata values * Make tests less fragile by allowing loose call ordering * Add s3ChunkProducer * Integrate s3ChunkProducer to support chunk buffering * Remove completed chunk files inline to reduce disk space usage * Add tests for chunk producer * docs: Use value from Host header to forward to tusd * Use int64 for MaxBufferedParts field * Default to 20 buffered parts * Rename s3ChunkProducer -> s3PartProducer * Document s3PartProducer struct * Clarify misleading comment * Revert "Remove completed chunk files inline to reduce disk space usage" This reverts commit b72a4d43d6c05d11fa9494b068cf465761e00bcf. * Remove redundant seek This is already being done in s3PartProducer. * Clean up any remaining files in the channel when we return * Make putPart* functions responsible for cleaning up temp files * handler: Add tests for empty metadata pairs * Factor out cleanUpTempFile func * Add test to ensure that temporary files get cleaned up Co-authored-by: Jens Steinhauser Co-authored-by: Marius --- pkg/s3store/s3store.go | 181 ++++++-- pkg/s3store/s3store_part_producer_test.go | 159 +++++++ pkg/s3store/s3store_test.go | 531 +++++++++++----------- 3 files changed, 567 insertions(+), 304 deletions(-) create mode 100644 pkg/s3store/s3store_part_producer_test.go diff --git a/pkg/s3store/s3store.go b/pkg/s3store/s3store.go index d3f7224..3c6b583 100644 --- a/pkg/s3store/s3store.go +++ b/pkg/s3store/s3store.go @@ -135,6 +135,15 @@ type S3Store struct { // MaxObjectSize is the maximum size an S3 Object can have according to S3 // API specifications. See link above. MaxObjectSize int64 + // MaxBufferedParts is the number of additional parts that can be received from + // the client and stored on disk while a part is being uploaded to S3. This + // can help improve throughput by not blocking the client while tusd is + // communicating with the S3 API, which can have unpredictable latency. + MaxBufferedParts int64 + // TemporaryDirectory is the path where S3Store will create temporary files + // on disk during the upload. An empty string ("", the default value) will + // cause S3Store to use the operating system's default temporary directory. + TemporaryDirectory string } type S3API interface { @@ -153,12 +162,14 @@ type S3API interface { // New constructs a new storage using the supplied bucket and service object. func New(bucket string, service S3API) S3Store { return S3Store{ - Bucket: bucket, - Service: service, - MaxPartSize: 5 * 1024 * 1024 * 1024, - MinPartSize: 5 * 1024 * 1024, - MaxMultipartParts: 10000, - MaxObjectSize: 5 * 1024 * 1024 * 1024 * 1024, + Bucket: bucket, + Service: service, + MaxPartSize: 5 * 1024 * 1024 * 1024, + MinPartSize: 5 * 1024 * 1024, + MaxMultipartParts: 10000, + MaxObjectSize: 5 * 1024 * 1024 * 1024 * 1024, + MaxBufferedParts: 20, + TemporaryDirectory: "", } } @@ -272,6 +283,73 @@ func (upload *s3Upload) writeInfo(ctx context.Context, info handler.FileInfo) er return err } +// s3PartProducer converts a stream of bytes from the reader into a stream of files on disk +type s3PartProducer struct { + store *S3Store + files chan<- *os.File + done chan struct{} + err error + r io.Reader +} + +func (spp *s3PartProducer) produce(partSize int64) { + for { + file, err := spp.nextPart(partSize) + if err != nil { + spp.err = err + close(spp.files) + return + } + if file == nil { + close(spp.files) + return + } + select { + case spp.files <- file: + case <-spp.done: + close(spp.files) + return + } + } +} + +func (spp *s3PartProducer) nextPart(size int64) (*os.File, error) { + // Create a temporary file to store the part + file, err := ioutil.TempFile(spp.store.TemporaryDirectory, "tusd-s3-tmp-") + if err != nil { + return nil, err + } + + limitedReader := io.LimitReader(spp.r, size) + n, err := io.Copy(file, limitedReader) + + // If the HTTP PATCH request gets interrupted in the middle (e.g. because + // the user wants to pause the upload), Go's net/http returns an io.ErrUnexpectedEOF. + // However, for S3Store it's not important whether the stream has ended + // on purpose or accidentally. Therefore, we ignore this error to not + // prevent the remaining chunk to be stored on S3. + if err == io.ErrUnexpectedEOF { + err = nil + } + + if err != nil { + return nil, err + } + + // If the entire request body is read and no more data is available, + // io.Copy returns 0 since it is unable to read any bytes. In that + // case, we can close the s3PartProducer. + if n == 0 { + cleanUpTempFile(file) + return nil, nil + } + + // Seek to the beginning of the file + file.Seek(0, 0) + + return file, nil +} + func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) { id := upload.id store := upload.store @@ -305,8 +383,7 @@ func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Read return 0, err } if incompletePartFile != nil { - defer os.Remove(incompletePartFile.Name()) - defer incompletePartFile.Close() + defer cleanUpTempFile(incompletePartFile) if err := store.deleteIncompletePartForUpload(ctx, uploadId); err != nil { return 0, err @@ -315,49 +392,43 @@ func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Read src = io.MultiReader(incompletePartFile, src) } - for { - // Create a temporary file to store the part in it - file, err := ioutil.TempFile("", "tusd-s3-tmp-") + fileChan := make(chan *os.File, store.MaxBufferedParts) + doneChan := make(chan struct{}) + defer close(doneChan) + + // If we panic or return while there are still files in the channel, then + // we may leak file descriptors. Let's ensure that those are cleaned up. + defer func() { + for file := range fileChan { + cleanUpTempFile(file) + } + }() + + partProducer := s3PartProducer{ + store: store, + done: doneChan, + files: fileChan, + r: src, + } + go partProducer.produce(optimalPartSize) + + for file := range fileChan { + stat, err := file.Stat() if err != nil { - return bytesUploaded, err + return 0, err } - defer os.Remove(file.Name()) - defer file.Close() - - limitedReader := io.LimitReader(src, optimalPartSize) - n, err := io.Copy(file, limitedReader) - - // If the HTTP PATCH request gets interrupted in the middle (e.g. because - // the user wants to pause the upload), Go's net/http returns an io.ErrUnexpectedEOF. - // However, for S3Store it's not important whether the stream has ended - // on purpose or accidentally. Therefore, we ignore this error to not - // prevent the remaining chunk to be stored on S3. - if err == io.ErrUnexpectedEOF { - err = nil - } - - // io.Copy does not return io.EOF, so we not have to handle it differently. - if err != nil { - return bytesUploaded, err - } - // If io.Copy is finished reading, it will always return (0, nil). - if n == 0 { - return (bytesUploaded - incompletePartSize), nil - } - - // Seek to the beginning of the file - file.Seek(0, 0) + n := stat.Size() isFinalChunk := !info.SizeIsDeferred && (size == (offset-incompletePartSize)+n) if n >= store.MinPartSize || isFinalChunk { - _, err = store.Service.UploadPartWithContext(ctx, &s3.UploadPartInput{ + uploadPartInput := &s3.UploadPartInput{ Bucket: aws.String(store.Bucket), Key: store.keyWithPrefix(uploadId), UploadId: aws.String(multipartId), PartNumber: aws.Int64(nextPartNum), Body: file, - }) - if err != nil { + } + if err := upload.putPartForUpload(ctx, uploadPartInput, file); err != nil { return bytesUploaded, err } } else { @@ -374,6 +445,20 @@ func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Read bytesUploaded += n nextPartNum += 1 } + + return bytesUploaded - incompletePartSize, partProducer.err +} + +func cleanUpTempFile(file *os.File) { + file.Close() + os.Remove(file.Name()) +} + +func (upload *s3Upload) putPartForUpload(ctx context.Context, uploadPartInput *s3.UploadPartInput, file *os.File) error { + defer cleanUpTempFile(file) + + _, err := upload.store.Service.UploadPartWithContext(ctx, uploadPartInput) + return err } func (upload *s3Upload) GetInfo(ctx context.Context) (info handler.FileInfo, err error) { @@ -643,13 +728,11 @@ func (upload *s3Upload) concatUsingDownload(ctx context.Context, partialUploads uploadId, multipartId := splitIds(id) // Create a temporary file for holding the concatenated data - file, err := ioutil.TempFile("", "tusd-s3-concat-tmp-") + file, err := ioutil.TempFile(store.TemporaryDirectory, "tusd-s3-concat-tmp-") if err != nil { return err } - fmt.Println(file.Name()) - defer os.Remove(file.Name()) - defer file.Close() + defer cleanUpTempFile(file) // Download each part and append it to the temporary file for _, partialUpload := range partialUploads { @@ -790,7 +873,7 @@ func (store S3Store) downloadIncompletePartForUpload(ctx context.Context, upload } defer incompleteUploadObject.Body.Close() - partFile, err := ioutil.TempFile("", "tusd-s3-tmp-") + partFile, err := ioutil.TempFile(store.TemporaryDirectory, "tusd-s3-tmp-") if err != nil { return nil, 0, err } @@ -824,11 +907,13 @@ func (store S3Store) getIncompletePartForUpload(ctx context.Context, uploadId st return obj, err } -func (store S3Store) putIncompletePartForUpload(ctx context.Context, uploadId string, r io.ReadSeeker) error { +func (store S3Store) putIncompletePartForUpload(ctx context.Context, uploadId string, file *os.File) error { + defer cleanUpTempFile(file) + _, err := store.Service.PutObjectWithContext(ctx, &s3.PutObjectInput{ Bucket: aws.String(store.Bucket), Key: store.metadataKeyWithPrefix(uploadId + ".part"), - Body: r, + Body: file, }) return err } diff --git a/pkg/s3store/s3store_part_producer_test.go b/pkg/s3store/s3store_part_producer_test.go new file mode 100644 index 0000000..ad7ad2f --- /dev/null +++ b/pkg/s3store/s3store_part_producer_test.go @@ -0,0 +1,159 @@ +package s3store + +import ( + "errors" + "os" + "strings" + "testing" + "time" +) + +type InfiniteZeroReader struct{} + +func (izr InfiniteZeroReader) Read(b []byte) (int, error) { + b[0] = 0 + return 1, nil +} + +type ErrorReader struct{} + +func (ErrorReader) Read(b []byte) (int, error) { + return 0, errors.New("error from ErrorReader") +} + +func TestPartProducerConsumesEntireReaderWithoutError(t *testing.T) { + fileChan := make(chan *os.File) + doneChan := make(chan struct{}) + expectedStr := "test" + r := strings.NewReader(expectedStr) + pp := s3PartProducer{ + store: &S3Store{}, + done: doneChan, + files: fileChan, + r: r, + } + go pp.produce(1) + + actualStr := "" + b := make([]byte, 1) + for f := range fileChan { + n, err := f.Read(b) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if n != 1 { + t.Fatalf("incorrect number of bytes read: wanted %d, got %d", 1, n) + } + actualStr += string(b) + + os.Remove(f.Name()) + f.Close() + } + + if actualStr != expectedStr { + t.Errorf("incorrect string read from channel: wanted %s, got %s", expectedStr, actualStr) + } + + if pp.err != nil { + t.Errorf("unexpected error from part producer: %s", pp.err) + } +} + +func TestPartProducerExitsWhenDoneChannelIsClosed(t *testing.T) { + fileChan := make(chan *os.File) + doneChan := make(chan struct{}) + pp := s3PartProducer{ + store: &S3Store{}, + done: doneChan, + files: fileChan, + r: InfiniteZeroReader{}, + } + + completedChan := make(chan struct{}) + go func() { + pp.produce(10) + completedChan <- struct{}{} + }() + + close(doneChan) + + select { + case <-completedChan: + // producer exited cleanly + case <-time.After(2 * time.Second): + t.Error("timed out waiting for producer to exit") + } + + safelyDrainChannelOrFail(fileChan, t) +} + +func TestPartProducerExitsWhenDoneChannelIsClosedBeforeAnyPartIsSent(t *testing.T) { + fileChan := make(chan *os.File) + doneChan := make(chan struct{}) + pp := s3PartProducer{ + store: &S3Store{}, + done: doneChan, + files: fileChan, + r: InfiniteZeroReader{}, + } + + close(doneChan) + + completedChan := make(chan struct{}) + go func() { + pp.produce(10) + completedChan <- struct{}{} + }() + + select { + case <-completedChan: + // producer exited cleanly + case <-time.After(2 * time.Second): + t.Error("timed out waiting for producer to exit") + } + + safelyDrainChannelOrFail(fileChan, t) +} + +func TestPartProducerExitsWhenUnableToReadFromFile(t *testing.T) { + fileChan := make(chan *os.File) + doneChan := make(chan struct{}) + pp := s3PartProducer{ + store: &S3Store{}, + done: doneChan, + files: fileChan, + r: ErrorReader{}, + } + + completedChan := make(chan struct{}) + go func() { + pp.produce(10) + completedChan <- struct{}{} + }() + + select { + case <-completedChan: + // producer exited cleanly + case <-time.After(2 * time.Second): + t.Error("timed out waiting for producer to exit") + } + + safelyDrainChannelOrFail(fileChan, t) + + if pp.err == nil { + t.Error("expected an error but didn't get one") + } +} + +func safelyDrainChannelOrFail(c chan *os.File, t *testing.T) { + // At this point, we've signaled that the producer should exit, but it may write a few files + // into the channel before closing it and exiting. Make sure that we get a nil value + // eventually. + for i := 0; i < 100; i++ { + if f := <-c; f == nil { + return + } + } + + t.Fatal("timed out waiting for channel to drain") +} diff --git a/pkg/s3store/s3store_test.go b/pkg/s3store/s3store_test.go index 210d00c..a30c4b5 100644 --- a/pkg/s3store/s3store_test.go +++ b/pkg/s3store/s3store_test.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "io/ioutil" + "strings" "testing" "time" @@ -14,6 +15,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/service/s3" "github.com/tus/tusd/pkg/handler" ) @@ -691,51 +693,37 @@ func TestWriteChunk(t *testing.T) { store.MaxMultipartParts = 10000 store.MaxObjectSize = 5 * 1024 * 1024 * 1024 * 1024 + s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.info"), + }).Return(&s3.GetObjectOutput{ + Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))), + }, nil) + s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), + }).Return(&s3.ListPartsOutput{ + Parts: []*s3.Part{ + { + Size: aws.Int64(100), + }, + { + Size: aws.Int64(200), + }, + }, + }, nil).Times(2) + s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "Not found", nil)) + s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist.", nil)) + gomock.InOrder( - s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId.info"), - }).Return(&s3.GetObjectOutput{ - Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))), - }, nil), - s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - PartNumberMarker: aws.Int64(0), - }).Return(&s3.ListPartsOutput{ - Parts: []*s3.Part{ - { - Size: aws.Int64(100), - }, - { - Size: aws.Int64(200), - }, - }, - }, nil), - s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId.part"), - }).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "Not found", nil)), - s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - PartNumberMarker: aws.Int64(0), - }).Return(&s3.ListPartsOutput{ - Parts: []*s3.Part{ - { - Size: aws.Int64(100), - }, - { - Size: aws.Int64(200), - }, - }, - }, nil), - s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId.part"), - }).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist.", nil)), s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ Bucket: aws.String("bucket"), Key: aws.String("uploadId"), @@ -787,57 +775,40 @@ func TestWriteChunkWithUnexpectedEOF(t *testing.T) { store.MaxMultipartParts = 10000 store.MaxObjectSize = 5 * 1024 * 1024 * 1024 * 1024 - gomock.InOrder( - s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId.info"), - }).Return(&s3.GetObjectOutput{ - Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))), - }, nil), - s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - PartNumberMarker: aws.Int64(0), - }).Return(&s3.ListPartsOutput{ - Parts: []*s3.Part{ - { - Size: aws.Int64(100), - }, - { - Size: aws.Int64(200), - }, + s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.info"), + }).Return(&s3.GetObjectOutput{ + Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))), + }, nil) + s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), + }).Return(&s3.ListPartsOutput{ + Parts: []*s3.Part{ + { + Size: aws.Int64(100), }, - }, nil), - s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId.part"), - }).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "Not found", nil)), - s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - PartNumberMarker: aws.Int64(0), - }).Return(&s3.ListPartsOutput{ - Parts: []*s3.Part{ - { - Size: aws.Int64(100), - }, - { - Size: aws.Int64(200), - }, + { + Size: aws.Int64(200), }, - }, nil), - s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId.part"), - }).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist.", nil)), - s3obj.EXPECT().PutObjectWithContext(context.Background(), NewPutObjectInputMatcher(&s3.PutObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId.part"), - Body: bytes.NewReader([]byte("1234567890ABCD")), - })).Return(nil, nil), - ) + }, + }, nil).Times(2) + s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "Not found", nil)) + s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist.", nil)) + s3obj.EXPECT().PutObjectWithContext(context.Background(), NewPutObjectInputMatcher(&s3.PutObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + Body: bytes.NewReader([]byte("1234567890ABCD")), + })).Return(nil, nil) reader, writer := io.Pipe() @@ -862,47 +833,33 @@ func TestWriteChunkWriteIncompletePartBecauseTooSmall(t *testing.T) { s3obj := NewMockS3API(mockCtrl) store := New("bucket", s3obj) + s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.info"), + }).Return(&s3.GetObjectOutput{ + Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))), + }, nil) + s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), + }).Return(&s3.ListPartsOutput{ + Parts: []*s3.Part{ + { + Size: aws.Int64(100), + }, + { + Size: aws.Int64(200), + }, + }, + }, nil).Times(2) + s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist", nil)) + gomock.InOrder( - s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId.info"), - }).Return(&s3.GetObjectOutput{ - Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))), - }, nil), - s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - PartNumberMarker: aws.Int64(0), - }).Return(&s3.ListPartsOutput{ - Parts: []*s3.Part{ - { - Size: aws.Int64(100), - }, - { - Size: aws.Int64(200), - }, - }, - }, nil), - s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId.part"), - }).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist", nil)), - s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - PartNumberMarker: aws.Int64(0), - }).Return(&s3.ListPartsOutput{ - Parts: []*s3.Part{ - { - Size: aws.Int64(100), - }, - { - Size: aws.Int64(200), - }, - }, - }, nil), s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ Bucket: aws.String("bucket"), Key: aws.String("uploadId.part"), @@ -934,43 +891,38 @@ func TestWriteChunkPrependsIncompletePart(t *testing.T) { store.MaxMultipartParts = 10000 store.MaxObjectSize = 5 * 1024 * 1024 * 1024 * 1024 + s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.info"), + }).Return(&s3.GetObjectOutput{ + Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":5,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))), + }, nil) + s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.GetObjectOutput{ + ContentLength: aws.Int64(3), + Body: ioutil.NopCloser(bytes.NewReader([]byte("123"))), + }, nil) + s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.GetObjectOutput{ + ContentLength: aws.Int64(3), + Body: ioutil.NopCloser(bytes.NewReader([]byte("123"))), + }, nil) + s3obj.EXPECT().DeleteObjectWithContext(context.Background(), &s3.DeleteObjectInput{ + Bucket: aws.String(store.Bucket), + Key: aws.String("uploadId.part"), + }).Return(&s3.DeleteObjectOutput{}, nil) + s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), + }).Return(&s3.ListPartsOutput{Parts: []*s3.Part{}}, nil).Times(2) + gomock.InOrder( - s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId.info"), - }).Return(&s3.GetObjectOutput{ - Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":5,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))), - }, nil), - s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - PartNumberMarker: aws.Int64(0), - }).Return(&s3.ListPartsOutput{Parts: []*s3.Part{}}, nil), - s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId.part"), - }).Return(&s3.GetObjectOutput{ - ContentLength: aws.Int64(3), - Body: ioutil.NopCloser(bytes.NewReader([]byte("123"))), - }, nil), - s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - PartNumberMarker: aws.Int64(0), - }).Return(&s3.ListPartsOutput{Parts: []*s3.Part{}}, nil), - s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId.part"), - }).Return(&s3.GetObjectOutput{ - ContentLength: aws.Int64(3), - Body: ioutil.NopCloser(bytes.NewReader([]byte("123"))), - }, nil), - s3obj.EXPECT().DeleteObjectWithContext(context.Background(), &s3.DeleteObjectInput{ - Bucket: aws.String(store.Bucket), - Key: aws.String("uploadId.part"), - }).Return(&s3.DeleteObjectOutput{}, nil), s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ Bucket: aws.String("bucket"), Key: aws.String("uploadId"), @@ -1007,43 +959,31 @@ func TestWriteChunkPrependsIncompletePartAndWritesANewIncompletePart(t *testing. store.MaxMultipartParts = 10000 store.MaxObjectSize = 5 * 1024 * 1024 * 1024 * 1024 + s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.info"), + }).Return(&s3.GetObjectOutput{ + Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":10,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))), + }, nil) + s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), + }).Return(&s3.ListPartsOutput{Parts: []*s3.Part{}}, nil).Times(2) + s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.GetObjectOutput{ + ContentLength: aws.Int64(3), + Body: ioutil.NopCloser(bytes.NewReader([]byte("123"))), + }, nil).Times(2) + s3obj.EXPECT().DeleteObjectWithContext(context.Background(), &s3.DeleteObjectInput{ + Bucket: aws.String(store.Bucket), + Key: aws.String("uploadId.part"), + }).Return(&s3.DeleteObjectOutput{}, nil) + gomock.InOrder( - s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId.info"), - }).Return(&s3.GetObjectOutput{ - Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":10,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))), - }, nil), - s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - PartNumberMarker: aws.Int64(0), - }).Return(&s3.ListPartsOutput{Parts: []*s3.Part{}}, nil), - s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId.part"), - }).Return(&s3.GetObjectOutput{ - ContentLength: aws.Int64(3), - Body: ioutil.NopCloser(bytes.NewReader([]byte("123"))), - }, nil), - s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - PartNumberMarker: aws.Int64(0), - }).Return(&s3.ListPartsOutput{Parts: []*s3.Part{}}, nil), - s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId.part"), - }).Return(&s3.GetObjectOutput{ - Body: ioutil.NopCloser(bytes.NewReader([]byte("123"))), - ContentLength: aws.Int64(3), - }, nil), - s3obj.EXPECT().DeleteObjectWithContext(context.Background(), &s3.DeleteObjectInput{ - Bucket: aws.String(store.Bucket), - Key: aws.String("uploadId.part"), - }).Return(&s3.DeleteObjectOutput{}, nil), s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ Bucket: aws.String("bucket"), Key: aws.String("uploadId"), @@ -1075,59 +1015,42 @@ func TestWriteChunkAllowTooSmallLast(t *testing.T) { store := New("bucket", s3obj) store.MinPartSize = 20 - gomock.InOrder( - s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId.info"), - }).Return(&s3.GetObjectOutput{ - Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))), - }, nil), - s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - PartNumberMarker: aws.Int64(0), - }).Return(&s3.ListPartsOutput{ - Parts: []*s3.Part{ - { - Size: aws.Int64(400), - }, - { - Size: aws.Int64(90), - }, + s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.info"), + }).Return(&s3.GetObjectOutput{ + Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))), + }, nil) + s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), + }).Return(&s3.ListPartsOutput{ + Parts: []*s3.Part{ + { + Size: aws.Int64(400), }, - }, nil), - s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId.part"), - }).Return(&s3.GetObjectOutput{}, awserr.New("AccessDenied", "Access Denied.", nil)), - s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - PartNumberMarker: aws.Int64(0), - }).Return(&s3.ListPartsOutput{ - Parts: []*s3.Part{ - { - Size: aws.Int64(400), - }, - { - Size: aws.Int64(90), - }, + { + Size: aws.Int64(90), }, - }, nil), - s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId.part"), - }).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist.", nil)), - s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - PartNumber: aws.Int64(3), - Body: bytes.NewReader([]byte("1234567890")), - })).Return(nil, nil), - ) + }, + }, nil).Times(2) + s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.GetObjectOutput{}, awserr.New("AccessDenied", "Access Denied.", nil)) + s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist.", nil)) + s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumber: aws.Int64(3), + Body: bytes.NewReader([]byte("1234567890")), + })).Return(nil, nil) upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") assert.Nil(err) @@ -1396,3 +1319,99 @@ func TestConcatUploadsUsingDownload(t *testing.T) { // Wait a short delay until the call to AbortMultipartUploadWithContext also occurs. <-time.After(10 * time.Millisecond) } + +type s3APIWithTempFileAssertion struct { + *MockS3API + assert *assert.Assertions + tempDir string +} + +func (s s3APIWithTempFileAssertion) UploadPartWithContext(context.Context, *s3.UploadPartInput, ...request.Option) (*s3.UploadPartOutput, error) { + assert := s.assert + + // Make sure that only the two temporary files from tusd are in here. + files, err := ioutil.ReadDir(s.tempDir) + assert.Nil(err) + for _, file := range files { + assert.True(strings.HasPrefix(file.Name(), "tusd-s3-tmp-")) + } + assert.Equal(len(files), 2) + + return nil, fmt.Errorf("not now") +} + +// This test ensures that the S3Store will cleanup all files that it creates during +// a call to WriteChunk, even if an error occurs during that invocation. +// Here, we provide 14 bytes to WriteChunk and since the PartSize is set to 10, +// it will split the input into two parts (10 bytes and 4 bytes). +// Inside the first call to UploadPartWithContext, we assert that the temporary files +// for both parts have been created and we return an error. +// In the end, we assert that the error bubbled up and that all temporary files have +// been cleaned up. +func TestWriteChunkCleansUpTempFiles(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + assert := assert.New(t) + + // Create a temporary directory, so no files get mixed in. + tempDir, err := ioutil.TempDir("", "tusd-s3-cleanup-tests-") + assert.Nil(err) + + s3obj := NewMockS3API(mockCtrl) + s3api := s3APIWithTempFileAssertion{ + MockS3API: s3obj, + assert: assert, + tempDir: tempDir, + } + store := New("bucket", s3api) + store.MaxPartSize = 10 + store.MinPartSize = 10 + store.MaxMultipartParts = 10000 + store.MaxObjectSize = 5 * 1024 * 1024 * 1024 * 1024 + store.TemporaryDirectory = tempDir + + // The usual S3 calls for retrieving the upload + s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.info"), + }).Return(&s3.GetObjectOutput{ + Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))), + }, nil) + s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), + }).Return(&s3.ListPartsOutput{ + Parts: []*s3.Part{ + { + Size: aws.Int64(100), + }, + { + Size: aws.Int64(200), + }, + }, + }, nil).Times(2) + s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "Not found", nil)) + s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist.", nil)) + + // No calls to s3obj.EXPECT().UploadPartWithContext since that is handled by s3APIWithTempFileAssertion + + upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") + assert.Nil(err) + + bytesRead, err := upload.WriteChunk(context.Background(), 300, bytes.NewReader([]byte("1234567890ABCD"))) + assert.NotNil(err) + assert.Equal(err.Error(), "not now") + assert.Equal(int64(0), bytesRead) + + files, err := ioutil.ReadDir(tempDir) + assert.Nil(err) + assert.Equal(len(files), 0) +}