s3Store: Concurrently write upload parts to S3 while reading from client (#402)
* Allow empty metadata values
* Make tests less fragile by allowing loose call ordering
* Add s3ChunkProducer
* Integrate s3ChunkProducer to support chunk buffering
* Remove completed chunk files inline to reduce disk space usage
* Add tests for chunk producer
* docs: Use value from Host header to forward to tusd
* Use int64 for MaxBufferedParts field
* Default to 20 buffered parts
* Rename s3ChunkProducer -> s3PartProducer
* Document s3PartProducer struct
* Clarify misleading comment
* Revert "Remove completed chunk files inline to reduce disk space usage"
This reverts commit b72a4d43d6
.
* Remove redundant seek
This is already being done in s3PartProducer.
* Clean up any remaining files in the channel when we return
* Make putPart* functions responsible for cleaning up temp files
* handler: Add tests for empty metadata pairs
* Factor out cleanUpTempFile func
* Add test to ensure that temporary files get cleaned up
Co-authored-by: Jens Steinhauser <jens.steinhauser@gmail.com>
Co-authored-by: Marius <marius@transloadit.com>
This commit is contained in:
parent
26b84bcb1c
commit
6662f43d01
|
@ -135,6 +135,15 @@ type S3Store struct {
|
|||
// MaxObjectSize is the maximum size an S3 Object can have according to S3
|
||||
// API specifications. See link above.
|
||||
MaxObjectSize int64
|
||||
// MaxBufferedParts is the number of additional parts that can be received from
|
||||
// the client and stored on disk while a part is being uploaded to S3. This
|
||||
// can help improve throughput by not blocking the client while tusd is
|
||||
// communicating with the S3 API, which can have unpredictable latency.
|
||||
MaxBufferedParts int64
|
||||
// TemporaryDirectory is the path where S3Store will create temporary files
|
||||
// on disk during the upload. An empty string ("", the default value) will
|
||||
// cause S3Store to use the operating system's default temporary directory.
|
||||
TemporaryDirectory string
|
||||
}
|
||||
|
||||
type S3API interface {
|
||||
|
@ -159,6 +168,8 @@ func New(bucket string, service S3API) S3Store {
|
|||
MinPartSize: 5 * 1024 * 1024,
|
||||
MaxMultipartParts: 10000,
|
||||
MaxObjectSize: 5 * 1024 * 1024 * 1024 * 1024,
|
||||
MaxBufferedParts: 20,
|
||||
TemporaryDirectory: "",
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -272,6 +283,73 @@ func (upload *s3Upload) writeInfo(ctx context.Context, info handler.FileInfo) er
|
|||
return err
|
||||
}
|
||||
|
||||
// s3PartProducer converts a stream of bytes from the reader into a stream of files on disk
|
||||
type s3PartProducer struct {
|
||||
store *S3Store
|
||||
files chan<- *os.File
|
||||
done chan struct{}
|
||||
err error
|
||||
r io.Reader
|
||||
}
|
||||
|
||||
func (spp *s3PartProducer) produce(partSize int64) {
|
||||
for {
|
||||
file, err := spp.nextPart(partSize)
|
||||
if err != nil {
|
||||
spp.err = err
|
||||
close(spp.files)
|
||||
return
|
||||
}
|
||||
if file == nil {
|
||||
close(spp.files)
|
||||
return
|
||||
}
|
||||
select {
|
||||
case spp.files <- file:
|
||||
case <-spp.done:
|
||||
close(spp.files)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (spp *s3PartProducer) nextPart(size int64) (*os.File, error) {
|
||||
// Create a temporary file to store the part
|
||||
file, err := ioutil.TempFile(spp.store.TemporaryDirectory, "tusd-s3-tmp-")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
limitedReader := io.LimitReader(spp.r, size)
|
||||
n, err := io.Copy(file, limitedReader)
|
||||
|
||||
// If the HTTP PATCH request gets interrupted in the middle (e.g. because
|
||||
// the user wants to pause the upload), Go's net/http returns an io.ErrUnexpectedEOF.
|
||||
// However, for S3Store it's not important whether the stream has ended
|
||||
// on purpose or accidentally. Therefore, we ignore this error to not
|
||||
// prevent the remaining chunk to be stored on S3.
|
||||
if err == io.ErrUnexpectedEOF {
|
||||
err = nil
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// If the entire request body is read and no more data is available,
|
||||
// io.Copy returns 0 since it is unable to read any bytes. In that
|
||||
// case, we can close the s3PartProducer.
|
||||
if n == 0 {
|
||||
cleanUpTempFile(file)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Seek to the beginning of the file
|
||||
file.Seek(0, 0)
|
||||
|
||||
return file, nil
|
||||
}
|
||||
|
||||
func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) {
|
||||
id := upload.id
|
||||
store := upload.store
|
||||
|
@ -305,8 +383,7 @@ func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Read
|
|||
return 0, err
|
||||
}
|
||||
if incompletePartFile != nil {
|
||||
defer os.Remove(incompletePartFile.Name())
|
||||
defer incompletePartFile.Close()
|
||||
defer cleanUpTempFile(incompletePartFile)
|
||||
|
||||
if err := store.deleteIncompletePartForUpload(ctx, uploadId); err != nil {
|
||||
return 0, err
|
||||
|
@ -315,49 +392,43 @@ func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Read
|
|||
src = io.MultiReader(incompletePartFile, src)
|
||||
}
|
||||
|
||||
for {
|
||||
// Create a temporary file to store the part in it
|
||||
file, err := ioutil.TempFile("", "tusd-s3-tmp-")
|
||||
fileChan := make(chan *os.File, store.MaxBufferedParts)
|
||||
doneChan := make(chan struct{})
|
||||
defer close(doneChan)
|
||||
|
||||
// If we panic or return while there are still files in the channel, then
|
||||
// we may leak file descriptors. Let's ensure that those are cleaned up.
|
||||
defer func() {
|
||||
for file := range fileChan {
|
||||
cleanUpTempFile(file)
|
||||
}
|
||||
}()
|
||||
|
||||
partProducer := s3PartProducer{
|
||||
store: store,
|
||||
done: doneChan,
|
||||
files: fileChan,
|
||||
r: src,
|
||||
}
|
||||
go partProducer.produce(optimalPartSize)
|
||||
|
||||
for file := range fileChan {
|
||||
stat, err := file.Stat()
|
||||
if err != nil {
|
||||
return bytesUploaded, err
|
||||
return 0, err
|
||||
}
|
||||
defer os.Remove(file.Name())
|
||||
defer file.Close()
|
||||
|
||||
limitedReader := io.LimitReader(src, optimalPartSize)
|
||||
n, err := io.Copy(file, limitedReader)
|
||||
|
||||
// If the HTTP PATCH request gets interrupted in the middle (e.g. because
|
||||
// the user wants to pause the upload), Go's net/http returns an io.ErrUnexpectedEOF.
|
||||
// However, for S3Store it's not important whether the stream has ended
|
||||
// on purpose or accidentally. Therefore, we ignore this error to not
|
||||
// prevent the remaining chunk to be stored on S3.
|
||||
if err == io.ErrUnexpectedEOF {
|
||||
err = nil
|
||||
}
|
||||
|
||||
// io.Copy does not return io.EOF, so we not have to handle it differently.
|
||||
if err != nil {
|
||||
return bytesUploaded, err
|
||||
}
|
||||
// If io.Copy is finished reading, it will always return (0, nil).
|
||||
if n == 0 {
|
||||
return (bytesUploaded - incompletePartSize), nil
|
||||
}
|
||||
|
||||
// Seek to the beginning of the file
|
||||
file.Seek(0, 0)
|
||||
n := stat.Size()
|
||||
|
||||
isFinalChunk := !info.SizeIsDeferred && (size == (offset-incompletePartSize)+n)
|
||||
if n >= store.MinPartSize || isFinalChunk {
|
||||
_, err = store.Service.UploadPartWithContext(ctx, &s3.UploadPartInput{
|
||||
uploadPartInput := &s3.UploadPartInput{
|
||||
Bucket: aws.String(store.Bucket),
|
||||
Key: store.keyWithPrefix(uploadId),
|
||||
UploadId: aws.String(multipartId),
|
||||
PartNumber: aws.Int64(nextPartNum),
|
||||
Body: file,
|
||||
})
|
||||
if err != nil {
|
||||
}
|
||||
if err := upload.putPartForUpload(ctx, uploadPartInput, file); err != nil {
|
||||
return bytesUploaded, err
|
||||
}
|
||||
} else {
|
||||
|
@ -374,6 +445,20 @@ func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Read
|
|||
bytesUploaded += n
|
||||
nextPartNum += 1
|
||||
}
|
||||
|
||||
return bytesUploaded - incompletePartSize, partProducer.err
|
||||
}
|
||||
|
||||
func cleanUpTempFile(file *os.File) {
|
||||
file.Close()
|
||||
os.Remove(file.Name())
|
||||
}
|
||||
|
||||
func (upload *s3Upload) putPartForUpload(ctx context.Context, uploadPartInput *s3.UploadPartInput, file *os.File) error {
|
||||
defer cleanUpTempFile(file)
|
||||
|
||||
_, err := upload.store.Service.UploadPartWithContext(ctx, uploadPartInput)
|
||||
return err
|
||||
}
|
||||
|
||||
func (upload *s3Upload) GetInfo(ctx context.Context) (info handler.FileInfo, err error) {
|
||||
|
@ -643,13 +728,11 @@ func (upload *s3Upload) concatUsingDownload(ctx context.Context, partialUploads
|
|||
uploadId, multipartId := splitIds(id)
|
||||
|
||||
// Create a temporary file for holding the concatenated data
|
||||
file, err := ioutil.TempFile("", "tusd-s3-concat-tmp-")
|
||||
file, err := ioutil.TempFile(store.TemporaryDirectory, "tusd-s3-concat-tmp-")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fmt.Println(file.Name())
|
||||
defer os.Remove(file.Name())
|
||||
defer file.Close()
|
||||
defer cleanUpTempFile(file)
|
||||
|
||||
// Download each part and append it to the temporary file
|
||||
for _, partialUpload := range partialUploads {
|
||||
|
@ -790,7 +873,7 @@ func (store S3Store) downloadIncompletePartForUpload(ctx context.Context, upload
|
|||
}
|
||||
defer incompleteUploadObject.Body.Close()
|
||||
|
||||
partFile, err := ioutil.TempFile("", "tusd-s3-tmp-")
|
||||
partFile, err := ioutil.TempFile(store.TemporaryDirectory, "tusd-s3-tmp-")
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
@ -824,11 +907,13 @@ func (store S3Store) getIncompletePartForUpload(ctx context.Context, uploadId st
|
|||
return obj, err
|
||||
}
|
||||
|
||||
func (store S3Store) putIncompletePartForUpload(ctx context.Context, uploadId string, r io.ReadSeeker) error {
|
||||
func (store S3Store) putIncompletePartForUpload(ctx context.Context, uploadId string, file *os.File) error {
|
||||
defer cleanUpTempFile(file)
|
||||
|
||||
_, err := store.Service.PutObjectWithContext(ctx, &s3.PutObjectInput{
|
||||
Bucket: aws.String(store.Bucket),
|
||||
Key: store.metadataKeyWithPrefix(uploadId + ".part"),
|
||||
Body: r,
|
||||
Body: file,
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -0,0 +1,159 @@
|
|||
package s3store
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
type InfiniteZeroReader struct{}
|
||||
|
||||
func (izr InfiniteZeroReader) Read(b []byte) (int, error) {
|
||||
b[0] = 0
|
||||
return 1, nil
|
||||
}
|
||||
|
||||
type ErrorReader struct{}
|
||||
|
||||
func (ErrorReader) Read(b []byte) (int, error) {
|
||||
return 0, errors.New("error from ErrorReader")
|
||||
}
|
||||
|
||||
func TestPartProducerConsumesEntireReaderWithoutError(t *testing.T) {
|
||||
fileChan := make(chan *os.File)
|
||||
doneChan := make(chan struct{})
|
||||
expectedStr := "test"
|
||||
r := strings.NewReader(expectedStr)
|
||||
pp := s3PartProducer{
|
||||
store: &S3Store{},
|
||||
done: doneChan,
|
||||
files: fileChan,
|
||||
r: r,
|
||||
}
|
||||
go pp.produce(1)
|
||||
|
||||
actualStr := ""
|
||||
b := make([]byte, 1)
|
||||
for f := range fileChan {
|
||||
n, err := f.Read(b)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
if n != 1 {
|
||||
t.Fatalf("incorrect number of bytes read: wanted %d, got %d", 1, n)
|
||||
}
|
||||
actualStr += string(b)
|
||||
|
||||
os.Remove(f.Name())
|
||||
f.Close()
|
||||
}
|
||||
|
||||
if actualStr != expectedStr {
|
||||
t.Errorf("incorrect string read from channel: wanted %s, got %s", expectedStr, actualStr)
|
||||
}
|
||||
|
||||
if pp.err != nil {
|
||||
t.Errorf("unexpected error from part producer: %s", pp.err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPartProducerExitsWhenDoneChannelIsClosed(t *testing.T) {
|
||||
fileChan := make(chan *os.File)
|
||||
doneChan := make(chan struct{})
|
||||
pp := s3PartProducer{
|
||||
store: &S3Store{},
|
||||
done: doneChan,
|
||||
files: fileChan,
|
||||
r: InfiniteZeroReader{},
|
||||
}
|
||||
|
||||
completedChan := make(chan struct{})
|
||||
go func() {
|
||||
pp.produce(10)
|
||||
completedChan <- struct{}{}
|
||||
}()
|
||||
|
||||
close(doneChan)
|
||||
|
||||
select {
|
||||
case <-completedChan:
|
||||
// producer exited cleanly
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Error("timed out waiting for producer to exit")
|
||||
}
|
||||
|
||||
safelyDrainChannelOrFail(fileChan, t)
|
||||
}
|
||||
|
||||
func TestPartProducerExitsWhenDoneChannelIsClosedBeforeAnyPartIsSent(t *testing.T) {
|
||||
fileChan := make(chan *os.File)
|
||||
doneChan := make(chan struct{})
|
||||
pp := s3PartProducer{
|
||||
store: &S3Store{},
|
||||
done: doneChan,
|
||||
files: fileChan,
|
||||
r: InfiniteZeroReader{},
|
||||
}
|
||||
|
||||
close(doneChan)
|
||||
|
||||
completedChan := make(chan struct{})
|
||||
go func() {
|
||||
pp.produce(10)
|
||||
completedChan <- struct{}{}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-completedChan:
|
||||
// producer exited cleanly
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Error("timed out waiting for producer to exit")
|
||||
}
|
||||
|
||||
safelyDrainChannelOrFail(fileChan, t)
|
||||
}
|
||||
|
||||
func TestPartProducerExitsWhenUnableToReadFromFile(t *testing.T) {
|
||||
fileChan := make(chan *os.File)
|
||||
doneChan := make(chan struct{})
|
||||
pp := s3PartProducer{
|
||||
store: &S3Store{},
|
||||
done: doneChan,
|
||||
files: fileChan,
|
||||
r: ErrorReader{},
|
||||
}
|
||||
|
||||
completedChan := make(chan struct{})
|
||||
go func() {
|
||||
pp.produce(10)
|
||||
completedChan <- struct{}{}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-completedChan:
|
||||
// producer exited cleanly
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Error("timed out waiting for producer to exit")
|
||||
}
|
||||
|
||||
safelyDrainChannelOrFail(fileChan, t)
|
||||
|
||||
if pp.err == nil {
|
||||
t.Error("expected an error but didn't get one")
|
||||
}
|
||||
}
|
||||
|
||||
func safelyDrainChannelOrFail(c chan *os.File, t *testing.T) {
|
||||
// At this point, we've signaled that the producer should exit, but it may write a few files
|
||||
// into the channel before closing it and exiting. Make sure that we get a nil value
|
||||
// eventually.
|
||||
for i := 0; i < 100; i++ {
|
||||
if f := <-c; f == nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
t.Fatal("timed out waiting for channel to drain")
|
||||
}
|
|
@ -6,6 +6,7 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -14,6 +15,7 @@ import (
|
|||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||
"github.com/aws/aws-sdk-go/aws/request"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
"github.com/tus/tusd/pkg/handler"
|
||||
)
|
||||
|
@ -691,13 +693,12 @@ func TestWriteChunk(t *testing.T) {
|
|||
store.MaxMultipartParts = 10000
|
||||
store.MaxObjectSize = 5 * 1024 * 1024 * 1024 * 1024
|
||||
|
||||
gomock.InOrder(
|
||||
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId.info"),
|
||||
}).Return(&s3.GetObjectOutput{
|
||||
Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))),
|
||||
}, nil),
|
||||
}, nil)
|
||||
s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId"),
|
||||
|
@ -712,30 +713,17 @@ func TestWriteChunk(t *testing.T) {
|
|||
Size: aws.Int64(200),
|
||||
},
|
||||
},
|
||||
}, nil),
|
||||
}, nil).Times(2)
|
||||
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId.part"),
|
||||
}).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "Not found", nil)),
|
||||
s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId"),
|
||||
UploadId: aws.String("multipartId"),
|
||||
PartNumberMarker: aws.Int64(0),
|
||||
}).Return(&s3.ListPartsOutput{
|
||||
Parts: []*s3.Part{
|
||||
{
|
||||
Size: aws.Int64(100),
|
||||
},
|
||||
{
|
||||
Size: aws.Int64(200),
|
||||
},
|
||||
},
|
||||
}, nil),
|
||||
}).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "Not found", nil))
|
||||
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId.part"),
|
||||
}).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist.", nil)),
|
||||
}).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist.", nil))
|
||||
|
||||
gomock.InOrder(
|
||||
s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId"),
|
||||
|
@ -787,13 +775,12 @@ func TestWriteChunkWithUnexpectedEOF(t *testing.T) {
|
|||
store.MaxMultipartParts = 10000
|
||||
store.MaxObjectSize = 5 * 1024 * 1024 * 1024 * 1024
|
||||
|
||||
gomock.InOrder(
|
||||
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId.info"),
|
||||
}).Return(&s3.GetObjectOutput{
|
||||
Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))),
|
||||
}, nil),
|
||||
}, nil)
|
||||
s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId"),
|
||||
|
@ -808,36 +795,20 @@ func TestWriteChunkWithUnexpectedEOF(t *testing.T) {
|
|||
Size: aws.Int64(200),
|
||||
},
|
||||
},
|
||||
}, nil),
|
||||
}, nil).Times(2)
|
||||
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId.part"),
|
||||
}).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "Not found", nil)),
|
||||
s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId"),
|
||||
UploadId: aws.String("multipartId"),
|
||||
PartNumberMarker: aws.Int64(0),
|
||||
}).Return(&s3.ListPartsOutput{
|
||||
Parts: []*s3.Part{
|
||||
{
|
||||
Size: aws.Int64(100),
|
||||
},
|
||||
{
|
||||
Size: aws.Int64(200),
|
||||
},
|
||||
},
|
||||
}, nil),
|
||||
}).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "Not found", nil))
|
||||
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId.part"),
|
||||
}).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist.", nil)),
|
||||
}).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist.", nil))
|
||||
s3obj.EXPECT().PutObjectWithContext(context.Background(), NewPutObjectInputMatcher(&s3.PutObjectInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId.part"),
|
||||
Body: bytes.NewReader([]byte("1234567890ABCD")),
|
||||
})).Return(nil, nil),
|
||||
)
|
||||
})).Return(nil, nil)
|
||||
|
||||
reader, writer := io.Pipe()
|
||||
|
||||
|
@ -862,13 +833,12 @@ func TestWriteChunkWriteIncompletePartBecauseTooSmall(t *testing.T) {
|
|||
s3obj := NewMockS3API(mockCtrl)
|
||||
store := New("bucket", s3obj)
|
||||
|
||||
gomock.InOrder(
|
||||
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId.info"),
|
||||
}).Return(&s3.GetObjectOutput{
|
||||
Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))),
|
||||
}, nil),
|
||||
}, nil)
|
||||
s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId"),
|
||||
|
@ -883,26 +853,13 @@ func TestWriteChunkWriteIncompletePartBecauseTooSmall(t *testing.T) {
|
|||
Size: aws.Int64(200),
|
||||
},
|
||||
},
|
||||
}, nil),
|
||||
}, nil).Times(2)
|
||||
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId.part"),
|
||||
}).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist", nil)),
|
||||
s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId"),
|
||||
UploadId: aws.String("multipartId"),
|
||||
PartNumberMarker: aws.Int64(0),
|
||||
}).Return(&s3.ListPartsOutput{
|
||||
Parts: []*s3.Part{
|
||||
{
|
||||
Size: aws.Int64(100),
|
||||
},
|
||||
{
|
||||
Size: aws.Int64(200),
|
||||
},
|
||||
},
|
||||
}, nil),
|
||||
}).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist", nil))
|
||||
|
||||
gomock.InOrder(
|
||||
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId.part"),
|
||||
|
@ -934,43 +891,38 @@ func TestWriteChunkPrependsIncompletePart(t *testing.T) {
|
|||
store.MaxMultipartParts = 10000
|
||||
store.MaxObjectSize = 5 * 1024 * 1024 * 1024 * 1024
|
||||
|
||||
gomock.InOrder(
|
||||
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId.info"),
|
||||
}).Return(&s3.GetObjectOutput{
|
||||
Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":5,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))),
|
||||
}, nil),
|
||||
s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId"),
|
||||
UploadId: aws.String("multipartId"),
|
||||
PartNumberMarker: aws.Int64(0),
|
||||
}).Return(&s3.ListPartsOutput{Parts: []*s3.Part{}}, nil),
|
||||
}, nil)
|
||||
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId.part"),
|
||||
}).Return(&s3.GetObjectOutput{
|
||||
ContentLength: aws.Int64(3),
|
||||
Body: ioutil.NopCloser(bytes.NewReader([]byte("123"))),
|
||||
}, nil),
|
||||
s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId"),
|
||||
UploadId: aws.String("multipartId"),
|
||||
PartNumberMarker: aws.Int64(0),
|
||||
}).Return(&s3.ListPartsOutput{Parts: []*s3.Part{}}, nil),
|
||||
}, nil)
|
||||
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId.part"),
|
||||
}).Return(&s3.GetObjectOutput{
|
||||
ContentLength: aws.Int64(3),
|
||||
Body: ioutil.NopCloser(bytes.NewReader([]byte("123"))),
|
||||
}, nil),
|
||||
}, nil)
|
||||
s3obj.EXPECT().DeleteObjectWithContext(context.Background(), &s3.DeleteObjectInput{
|
||||
Bucket: aws.String(store.Bucket),
|
||||
Key: aws.String("uploadId.part"),
|
||||
}).Return(&s3.DeleteObjectOutput{}, nil),
|
||||
}).Return(&s3.DeleteObjectOutput{}, nil)
|
||||
s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId"),
|
||||
UploadId: aws.String("multipartId"),
|
||||
PartNumberMarker: aws.Int64(0),
|
||||
}).Return(&s3.ListPartsOutput{Parts: []*s3.Part{}}, nil).Times(2)
|
||||
|
||||
gomock.InOrder(
|
||||
s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId"),
|
||||
|
@ -1007,43 +959,31 @@ func TestWriteChunkPrependsIncompletePartAndWritesANewIncompletePart(t *testing.
|
|||
store.MaxMultipartParts = 10000
|
||||
store.MaxObjectSize = 5 * 1024 * 1024 * 1024 * 1024
|
||||
|
||||
gomock.InOrder(
|
||||
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId.info"),
|
||||
}).Return(&s3.GetObjectOutput{
|
||||
Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":10,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))),
|
||||
}, nil),
|
||||
}, nil)
|
||||
s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId"),
|
||||
UploadId: aws.String("multipartId"),
|
||||
PartNumberMarker: aws.Int64(0),
|
||||
}).Return(&s3.ListPartsOutput{Parts: []*s3.Part{}}, nil),
|
||||
}).Return(&s3.ListPartsOutput{Parts: []*s3.Part{}}, nil).Times(2)
|
||||
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId.part"),
|
||||
}).Return(&s3.GetObjectOutput{
|
||||
ContentLength: aws.Int64(3),
|
||||
Body: ioutil.NopCloser(bytes.NewReader([]byte("123"))),
|
||||
}, nil),
|
||||
s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId"),
|
||||
UploadId: aws.String("multipartId"),
|
||||
PartNumberMarker: aws.Int64(0),
|
||||
}).Return(&s3.ListPartsOutput{Parts: []*s3.Part{}}, nil),
|
||||
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId.part"),
|
||||
}).Return(&s3.GetObjectOutput{
|
||||
Body: ioutil.NopCloser(bytes.NewReader([]byte("123"))),
|
||||
ContentLength: aws.Int64(3),
|
||||
}, nil),
|
||||
}, nil).Times(2)
|
||||
s3obj.EXPECT().DeleteObjectWithContext(context.Background(), &s3.DeleteObjectInput{
|
||||
Bucket: aws.String(store.Bucket),
|
||||
Key: aws.String("uploadId.part"),
|
||||
}).Return(&s3.DeleteObjectOutput{}, nil),
|
||||
}).Return(&s3.DeleteObjectOutput{}, nil)
|
||||
|
||||
gomock.InOrder(
|
||||
s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId"),
|
||||
|
@ -1075,13 +1015,12 @@ func TestWriteChunkAllowTooSmallLast(t *testing.T) {
|
|||
store := New("bucket", s3obj)
|
||||
store.MinPartSize = 20
|
||||
|
||||
gomock.InOrder(
|
||||
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId.info"),
|
||||
}).Return(&s3.GetObjectOutput{
|
||||
Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))),
|
||||
}, nil),
|
||||
}, nil)
|
||||
s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId"),
|
||||
|
@ -1096,38 +1035,22 @@ func TestWriteChunkAllowTooSmallLast(t *testing.T) {
|
|||
Size: aws.Int64(90),
|
||||
},
|
||||
},
|
||||
}, nil),
|
||||
}, nil).Times(2)
|
||||
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId.part"),
|
||||
}).Return(&s3.GetObjectOutput{}, awserr.New("AccessDenied", "Access Denied.", nil)),
|
||||
s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId"),
|
||||
UploadId: aws.String("multipartId"),
|
||||
PartNumberMarker: aws.Int64(0),
|
||||
}).Return(&s3.ListPartsOutput{
|
||||
Parts: []*s3.Part{
|
||||
{
|
||||
Size: aws.Int64(400),
|
||||
},
|
||||
{
|
||||
Size: aws.Int64(90),
|
||||
},
|
||||
},
|
||||
}, nil),
|
||||
}).Return(&s3.GetObjectOutput{}, awserr.New("AccessDenied", "Access Denied.", nil))
|
||||
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId.part"),
|
||||
}).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist.", nil)),
|
||||
}).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist.", nil))
|
||||
s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId"),
|
||||
UploadId: aws.String("multipartId"),
|
||||
PartNumber: aws.Int64(3),
|
||||
Body: bytes.NewReader([]byte("1234567890")),
|
||||
})).Return(nil, nil),
|
||||
)
|
||||
})).Return(nil, nil)
|
||||
|
||||
upload, err := store.GetUpload(context.Background(), "uploadId+multipartId")
|
||||
assert.Nil(err)
|
||||
|
@ -1396,3 +1319,99 @@ func TestConcatUploadsUsingDownload(t *testing.T) {
|
|||
// Wait a short delay until the call to AbortMultipartUploadWithContext also occurs.
|
||||
<-time.After(10 * time.Millisecond)
|
||||
}
|
||||
|
||||
type s3APIWithTempFileAssertion struct {
|
||||
*MockS3API
|
||||
assert *assert.Assertions
|
||||
tempDir string
|
||||
}
|
||||
|
||||
func (s s3APIWithTempFileAssertion) UploadPartWithContext(context.Context, *s3.UploadPartInput, ...request.Option) (*s3.UploadPartOutput, error) {
|
||||
assert := s.assert
|
||||
|
||||
// Make sure that only the two temporary files from tusd are in here.
|
||||
files, err := ioutil.ReadDir(s.tempDir)
|
||||
assert.Nil(err)
|
||||
for _, file := range files {
|
||||
assert.True(strings.HasPrefix(file.Name(), "tusd-s3-tmp-"))
|
||||
}
|
||||
assert.Equal(len(files), 2)
|
||||
|
||||
return nil, fmt.Errorf("not now")
|
||||
}
|
||||
|
||||
// This test ensures that the S3Store will cleanup all files that it creates during
|
||||
// a call to WriteChunk, even if an error occurs during that invocation.
|
||||
// Here, we provide 14 bytes to WriteChunk and since the PartSize is set to 10,
|
||||
// it will split the input into two parts (10 bytes and 4 bytes).
|
||||
// Inside the first call to UploadPartWithContext, we assert that the temporary files
|
||||
// for both parts have been created and we return an error.
|
||||
// In the end, we assert that the error bubbled up and that all temporary files have
|
||||
// been cleaned up.
|
||||
func TestWriteChunkCleansUpTempFiles(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
assert := assert.New(t)
|
||||
|
||||
// Create a temporary directory, so no files get mixed in.
|
||||
tempDir, err := ioutil.TempDir("", "tusd-s3-cleanup-tests-")
|
||||
assert.Nil(err)
|
||||
|
||||
s3obj := NewMockS3API(mockCtrl)
|
||||
s3api := s3APIWithTempFileAssertion{
|
||||
MockS3API: s3obj,
|
||||
assert: assert,
|
||||
tempDir: tempDir,
|
||||
}
|
||||
store := New("bucket", s3api)
|
||||
store.MaxPartSize = 10
|
||||
store.MinPartSize = 10
|
||||
store.MaxMultipartParts = 10000
|
||||
store.MaxObjectSize = 5 * 1024 * 1024 * 1024 * 1024
|
||||
store.TemporaryDirectory = tempDir
|
||||
|
||||
// The usual S3 calls for retrieving the upload
|
||||
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId.info"),
|
||||
}).Return(&s3.GetObjectOutput{
|
||||
Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))),
|
||||
}, nil)
|
||||
s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId"),
|
||||
UploadId: aws.String("multipartId"),
|
||||
PartNumberMarker: aws.Int64(0),
|
||||
}).Return(&s3.ListPartsOutput{
|
||||
Parts: []*s3.Part{
|
||||
{
|
||||
Size: aws.Int64(100),
|
||||
},
|
||||
{
|
||||
Size: aws.Int64(200),
|
||||
},
|
||||
},
|
||||
}, nil).Times(2)
|
||||
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId.part"),
|
||||
}).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "Not found", nil))
|
||||
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId.part"),
|
||||
}).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist.", nil))
|
||||
|
||||
// No calls to s3obj.EXPECT().UploadPartWithContext since that is handled by s3APIWithTempFileAssertion
|
||||
|
||||
upload, err := store.GetUpload(context.Background(), "uploadId+multipartId")
|
||||
assert.Nil(err)
|
||||
|
||||
bytesRead, err := upload.WriteChunk(context.Background(), 300, bytes.NewReader([]byte("1234567890ABCD")))
|
||||
assert.NotNil(err)
|
||||
assert.Equal(err.Error(), "not now")
|
||||
assert.Equal(int64(0), bytesRead)
|
||||
|
||||
files, err := ioutil.ReadDir(tempDir)
|
||||
assert.Nil(err)
|
||||
assert.Equal(len(files), 0)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue