s3store: Parallelize part uploads and information retrieval (#478)

* Add first draft of parallel upload queue

* s3store: Use queue for parallel uploads

* Revert "Add first draft of parallel upload queue"

This reverts commit 86a329cef2.

* Revert "s3store: Use queue for parallel uploads"

This reverts commit 29b59a2c90.

* s3store: Cache results from listing parts and checking incomplete object

* s3store: Remove debugging output`

* s3store: Make requests for fetching info concurrently

* s3store: Make parallel uploads work and tests pass

* s3store: Add semaphore package

* s3store: Add comments to semaphore package

* s3store: Encapsulate more logic into s3PartProducer

* s3store: Refactor WriteChunk

* s3store: Remove TODO

* s3store: Acquire lock before uploading

* cli: Add flag for setting concurrency limit

* s3store: One more comment
This commit is contained in:
Marius 2021-05-18 10:29:18 +02:00 committed by GitHub
parent d560c4e753
commit 8fd18364e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 773 additions and 569 deletions

View File

@ -27,6 +27,7 @@ var Flags struct {
S3PartSize int64
S3DisableContentHashes bool
S3DisableSSL bool
S3ConcurrentPartUploads int
GCSBucket string
GCSObjectPrefix string
EnabledHooksString string
@ -68,6 +69,7 @@ func ParseFlags() {
flag.Int64Var(&Flags.S3PartSize, "s3-part-size", 50*1024*1024, "Size in bytes of the individual upload requests made to the S3 API. Defaults to 50MiB (experimental and may be removed in the future)")
flag.BoolVar(&Flags.S3DisableContentHashes, "s3-disable-content-hashes", false, "Disable the calculation of MD5 and SHA256 hashes for the content that gets uploaded to S3 for minimized CPU usage (experimental and may be removed in the future)")
flag.BoolVar(&Flags.S3DisableSSL, "s3-disable-ssl", false, "Disable SSL and only use HTTP for communication with S3 (experimental and may be removed in the future)")
flag.IntVar(&Flags.S3ConcurrentPartUploads, "s3-concurrent-part-uploads", 10, "Number of concurrent part uploads to S3 (experimental and may be removed in the future)")
flag.StringVar(&Flags.GCSBucket, "gcs-bucket", "", "Use Google Cloud Storage with this bucket as storage backend (requires the GCS_SERVICE_ACCOUNT_FILE environment variable to be set)")
flag.StringVar(&Flags.GCSObjectPrefix, "gcs-object-prefix", "", "Prefix for GCS object names (can't contain underscore character)")
flag.StringVar(&Flags.EnabledHooksString, "hooks-enabled-events", "pre-create,post-create,post-receive,post-terminate,post-finish", "Comma separated list of enabled hook events (e.g. post-create,post-finish). Leave empty to enable default events")

View File

@ -0,0 +1,20 @@
// Package semaphore implements a basic semaphore for coordinating and limiting
// non-exclusive, concurrent access.
package semaphore
type Semaphore chan struct{}
// New creates a semaphore with the given concurrency limit.
func New(concurrency int) Semaphore {
return make(chan struct{}, concurrency)
}
// Acquire will block until the semaphore can be acquired.
func (s Semaphore) Acquire() {
s <- struct{}{}
}
// Release frees the acquired slot in the semaphore.
func (s Semaphore) Release() {
<-s
}

View File

@ -83,6 +83,7 @@ import (
"sync"
"time"
"github.com/tus/tusd/internal/semaphore"
"github.com/tus/tusd/internal/uid"
"github.com/tus/tusd/pkg/handler"
@ -158,6 +159,9 @@ type S3Store struct {
// CPU, so it might be desirable to disable them.
// Note that this property is experimental and might be removed in the future!
DisableContentHashes bool
// uploadSemaphore limits the number of concurrent multipart part uploads to S3.
uploadSemaphore semaphore.Semaphore
}
type S3API interface {
@ -165,6 +169,7 @@ type S3API interface {
ListPartsWithContext(ctx context.Context, input *s3.ListPartsInput, opt ...request.Option) (*s3.ListPartsOutput, error)
UploadPartWithContext(ctx context.Context, input *s3.UploadPartInput, opt ...request.Option) (*s3.UploadPartOutput, error)
GetObjectWithContext(ctx context.Context, input *s3.GetObjectInput, opt ...request.Option) (*s3.GetObjectOutput, error)
HeadObjectWithContext(ctx context.Context, input *s3.HeadObjectInput, opt ...request.Option) (*s3.HeadObjectOutput, error)
CreateMultipartUploadWithContext(ctx context.Context, input *s3.CreateMultipartUploadInput, opt ...request.Option) (*s3.CreateMultipartUploadOutput, error)
AbortMultipartUploadWithContext(ctx context.Context, input *s3.AbortMultipartUploadInput, opt ...request.Option) (*s3.AbortMultipartUploadOutput, error)
DeleteObjectWithContext(ctx context.Context, input *s3.DeleteObjectInput, opt ...request.Option) (*s3.DeleteObjectOutput, error)
@ -189,9 +194,15 @@ func New(bucket string, service S3API) S3Store {
MaxObjectSize: 5 * 1024 * 1024 * 1024 * 1024,
MaxBufferedParts: 20,
TemporaryDirectory: "",
uploadSemaphore: semaphore.New(10),
}
}
// S3ConcurrentPartUploads changes the limit on how many concurrent part uploads to S3 are allowed.
func (store *S3Store) S3ConcurrentPartUploads(limit int) {
store.uploadSemaphore = semaphore.New(limit)
}
// UseIn sets this store as the core data store in the passed composer and adds
// all possible extension to it.
func (store S3Store) UseIn(composer *handler.StoreComposer) {
@ -209,6 +220,18 @@ type s3Upload struct {
// been fetched yet from S3. Never read or write to it directly but instead use
// the GetInfo and writeInfo functions.
info *handler.FileInfo
// parts collects all parts for this upload. It will be nil if info is nil as well.
parts []*s3Part
// incompletePartSize is the size of an incomplete part object, if one exists. It will be 0 if info is nil as well.
incompletePartSize int64
}
// s3Part represents a single part of a S3 multipart upload.
type s3Part struct {
number int64
size int64
etag string
}
func (store S3Store) NewUpload(ctx context.Context, info handler.FileInfo) (handler.Upload, error) {
@ -253,7 +276,7 @@ func (store S3Store) NewUpload(ctx context.Context, info handler.FileInfo) (hand
"Key": *store.keyWithPrefix(uploadId),
}
upload := &s3Upload{id, &store, nil}
upload := &s3Upload{id, &store, nil, []*s3Part{}, 0}
err = upload.writeInfo(ctx, info)
if err != nil {
return nil, fmt.Errorf("s3store: unable to create info file:\n%s", err)
@ -263,7 +286,7 @@ func (store S3Store) NewUpload(ctx context.Context, info handler.FileInfo) (hand
}
func (store S3Store) GetUpload(ctx context.Context, id string) (handler.Upload, error) {
return &s3Upload{id, &store, nil}, nil
return &s3Upload{id, &store, nil, []*s3Part{}, 0}, nil
}
func (store S3Store) AsTerminatableUpload(upload handler.Upload) handler.TerminatableUpload {
@ -302,14 +325,60 @@ func (upload *s3Upload) writeInfo(ctx context.Context, info handler.FileInfo) er
return err
}
func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) {
func (upload *s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) {
id := upload.id
store := upload.store
uploadId, _ := splitIds(id)
// Get the total size of the current upload, number of parts to generate next number and whether
// an incomplete part exists
_, _, incompletePartSize, err := upload.getInternalInfo(ctx)
if err != nil {
return 0, err
}
if incompletePartSize > 0 {
incompletePartFile, err := store.downloadIncompletePartForUpload(ctx, uploadId)
if err != nil {
return 0, err
}
if incompletePartFile == nil {
return 0, fmt.Errorf("s3store: Expected an incomplete part file but did not get any")
}
defer cleanUpTempFile(incompletePartFile)
if err := store.deleteIncompletePartForUpload(ctx, uploadId); err != nil {
return 0, err
}
// Prepend an incomplete part, if necessary and adapt the offset
src = io.MultiReader(incompletePartFile, src)
offset = offset - incompletePartSize
}
bytesUploaded, err := upload.uploadParts(ctx, offset, src)
// The size of the incomplete part should not be counted, because the
// process of the incomplete part should be fully transparent to the user.
bytesUploaded = bytesUploaded - incompletePartSize
if bytesUploaded < 0 {
bytesUploaded = 0
}
upload.info.Offset += bytesUploaded
return bytesUploaded, err
}
func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Reader) (int64, error) {
id := upload.id
store := upload.store
uploadId, multipartId := splitIds(id)
// Get the total size of the current upload
info, err := upload.GetInfo(ctx)
// Get the total size of the current upload and number of parts to generate next number
info, parts, _, err := upload.getInternalInfo(ctx)
if err != nil {
return 0, err
}
@ -321,83 +390,77 @@ func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Read
return 0, err
}
// Get number of parts to generate next number
parts, err := store.listAllParts(ctx, id)
if err != nil {
return 0, err
}
numParts := len(parts)
nextPartNum := int64(numParts + 1)
incompletePartFile, incompletePartSize, err := store.downloadIncompletePartForUpload(ctx, uploadId)
if err != nil {
return 0, err
}
if incompletePartFile != nil {
defer cleanUpTempFile(incompletePartFile)
if err := store.deleteIncompletePartForUpload(ctx, uploadId); err != nil {
return 0, err
}
src = io.MultiReader(incompletePartFile, src)
}
fileChan := make(chan *os.File, store.MaxBufferedParts)
doneChan := make(chan struct{})
defer close(doneChan)
// If we panic or return while there are still files in the channel, then
// we may leak file descriptors. Let's ensure that those are cleaned up.
defer func() {
for file := range fileChan {
cleanUpTempFile(file)
}
}()
partProducer := s3PartProducer{
store: store,
done: doneChan,
files: fileChan,
r: src,
}
partProducer, fileChan := newS3PartProducer(src, store.MaxBufferedParts, store.TemporaryDirectory)
defer partProducer.stop()
go partProducer.produce(optimalPartSize)
for file := range fileChan {
stat, err := file.Stat()
if err != nil {
return 0, err
}
n := stat.Size()
var wg sync.WaitGroup
var uploadErr error
for fileChunk := range fileChan {
partfile := fileChunk.file
partsize := fileChunk.size
isFinalChunk := !info.SizeIsDeferred && (size == offset+bytesUploaded+partsize)
if partsize >= store.MinPartSize || isFinalChunk {
part := &s3Part{
etag: "",
size: partsize,
number: nextPartNum,
}
upload.parts = append(upload.parts, part)
wg.Add(1)
// We acquire the semaphore before starting the goroutine to avoid
// starting many goroutines, most of which are just waiting for the lock.
upload.store.uploadSemaphore.Acquire()
go func(file *os.File, part *s3Part) {
defer upload.store.uploadSemaphore.Release()
defer wg.Done()
isFinalChunk := !info.SizeIsDeferred && (size == (offset-incompletePartSize)+n)
if n >= store.MinPartSize || isFinalChunk {
uploadPartInput := &s3.UploadPartInput{
Bucket: aws.String(store.Bucket),
Key: store.keyWithPrefix(uploadId),
UploadId: aws.String(multipartId),
PartNumber: aws.Int64(nextPartNum),
}
if err := upload.putPartForUpload(ctx, uploadPartInput, file, n); err != nil {
return bytesUploaded, err
PartNumber: aws.Int64(part.number),
}
etag, err := upload.putPartForUpload(ctx, uploadPartInput, file, part.size)
if err != nil {
uploadErr = err
} else {
part.etag = etag
}
}(partfile, part)
} else {
wg.Add(1)
// We acquire the semaphore before starting the goroutine to avoid
// starting many goroutines, most of which are just waiting for the lock.
upload.store.uploadSemaphore.Acquire()
go func(file *os.File) {
defer upload.store.uploadSemaphore.Release()
defer wg.Done()
if err := store.putIncompletePartForUpload(ctx, uploadId, file); err != nil {
return bytesUploaded, err
uploadErr = err
}
upload.incompletePartSize = partsize
}(partfile)
}
bytesUploaded += n
return (bytesUploaded - incompletePartSize), nil
}
offset += n
bytesUploaded += n
bytesUploaded += partsize
nextPartNum += 1
}
return bytesUploaded - incompletePartSize, partProducer.err
wg.Wait()
if uploadErr != nil {
return 0, uploadErr
}
return bytesUploaded, partProducer.err
}
func cleanUpTempFile(file *os.File) {
@ -405,14 +468,17 @@ func cleanUpTempFile(file *os.File) {
os.Remove(file.Name())
}
func (upload *s3Upload) putPartForUpload(ctx context.Context, uploadPartInput *s3.UploadPartInput, file *os.File, size int64) error {
func (upload *s3Upload) putPartForUpload(ctx context.Context, uploadPartInput *s3.UploadPartInput, file *os.File, size int64) (string, error) {
defer cleanUpTempFile(file)
if !upload.store.DisableContentHashes {
// By default, use the traditional approach to upload data
uploadPartInput.Body = file
_, err := upload.store.Service.UploadPartWithContext(ctx, uploadPartInput)
return err
res, err := upload.store.Service.UploadPartWithContext(ctx, uploadPartInput)
if err != nil {
return "", err
}
return *res.ETag, nil
} else {
// Experimental feature to prevent the AWS SDK from calculating the SHA256 hash
// for the parts we upload to S3.
@ -420,19 +486,19 @@ func (upload *s3Upload) putPartForUpload(ctx context.Context, uploadPartInput *s
// on our own. This way, the body is not included in the SHA256 calculation.
s3api, ok := upload.store.Service.(s3APIForPresigning)
if !ok {
return fmt.Errorf("s3store: failed to cast S3 service for presigning")
return "", fmt.Errorf("s3store: failed to cast S3 service for presigning")
}
s3Req, _ := s3api.UploadPartRequest(uploadPartInput)
url, err := s3Req.Presign(15 * time.Minute)
if err != nil {
return err
return "", err
}
req, err := http.NewRequest("PUT", url, file)
if err != nil {
return err
return "", err
}
// Set the Content-Length manually to prevent the usage of Transfer-Encoding: chunked,
@ -441,89 +507,122 @@ func (upload *s3Upload) putPartForUpload(ctx context.Context, uploadPartInput *s
res, err := http.DefaultClient.Do(req)
if err != nil {
return err
return "", err
}
defer res.Body.Close()
if res.StatusCode != 200 {
buf := new(strings.Builder)
io.Copy(buf, res.Body)
return fmt.Errorf("s3store: unexpected response code %d for presigned upload: %s", res.StatusCode, buf.String())
return "", fmt.Errorf("s3store: unexpected response code %d for presigned upload: %s", res.StatusCode, buf.String())
}
return nil
return res.Header.Get("ETag"), nil
}
}
func (upload *s3Upload) GetInfo(ctx context.Context) (info handler.FileInfo, err error) {
info, _, _, err = upload.getInternalInfo(ctx)
return info, err
}
func (upload *s3Upload) getInternalInfo(ctx context.Context) (info handler.FileInfo, parts []*s3Part, incompletePartSize int64, err error) {
if upload.info != nil {
return *upload.info, nil
return *upload.info, upload.parts, upload.incompletePartSize, nil
}
info, err = upload.fetchInfo(ctx)
info, parts, incompletePartSize, err = upload.fetchInfo(ctx)
if err != nil {
return info, err
return info, parts, incompletePartSize, err
}
upload.info = &info
return info, nil
upload.parts = parts
upload.incompletePartSize = incompletePartSize
return info, parts, incompletePartSize, nil
}
func (upload s3Upload) fetchInfo(ctx context.Context) (info handler.FileInfo, err error) {
func (upload s3Upload) fetchInfo(ctx context.Context) (info handler.FileInfo, parts []*s3Part, incompletePartSize int64, err error) {
id := upload.id
store := upload.store
uploadId, _ := splitIds(id)
var wg sync.WaitGroup
wg.Add(3)
// We store all errors in here and handle them all together once the wait
// group is done.
var infoErr error
var partsErr error
var incompletePartSizeErr error
go func() {
defer wg.Done()
// Get file info stored in separate object
res, err := store.Service.GetObjectWithContext(ctx, &s3.GetObjectInput{
var res *s3.GetObjectOutput
res, infoErr = store.Service.GetObjectWithContext(ctx, &s3.GetObjectInput{
Bucket: aws.String(store.Bucket),
Key: store.metadataKeyWithPrefix(uploadId + ".info"),
})
if err != nil {
if isAwsError(err, "NoSuchKey") {
return info, handler.ErrNotFound
if infoErr == nil {
infoErr = json.NewDecoder(res.Body).Decode(&info)
}
}()
return info, err
}
if err := json.NewDecoder(res.Body).Decode(&info); err != nil {
return info, err
}
go func() {
defer wg.Done()
// Get uploaded parts and their offset
parts, err := store.listAllParts(ctx, id)
if err != nil {
// Check if the error is caused by the upload not being found. This happens
parts, partsErr = store.listAllParts(ctx, id)
}()
go func() {
defer wg.Done()
// Get size of optional incomplete part file.
incompletePartSize, incompletePartSizeErr = store.headIncompletePartForUpload(ctx, uploadId)
}()
wg.Wait()
// Finally, after all requests are complete, let's handle the errors
if infoErr != nil {
err = infoErr
// If the info file is not found, we consider the upload to be non-existant
if isAwsError(err, "NoSuchKey") {
err = handler.ErrNotFound
}
return
}
if partsErr != nil {
err = partsErr
// Check if the error is caused by the multipart upload not being found. This happens
// when the multipart upload has already been completed or aborted. Since
// we already found the info object, we know that the upload has been
// completed and therefore can ensure the the offset is the size.
if isAwsError(err, "NoSuchUpload") {
info.Offset = info.Size
return info, nil
} else {
return info, err
err = nil
}
return
}
offset := int64(0)
if incompletePartSizeErr != nil {
err = incompletePartSizeErr
return
}
// The offset is the sum of all part sizes and the size of the incomplete part file.
offset := incompletePartSize
for _, part := range parts {
offset += *part.Size
}
incompletePartObject, err := store.getIncompletePartForUpload(ctx, uploadId)
if err != nil {
return info, err
}
if incompletePartObject != nil {
defer incompletePartObject.Body.Close()
offset += *incompletePartObject.ContentLength
offset += part.size
}
info.Offset = offset
return
return info, parts, incompletePartSize, nil
}
func (upload s3Upload) GetReader(ctx context.Context) (io.Reader, error) {
@ -640,7 +739,7 @@ func (upload s3Upload) FinishUpload(ctx context.Context) error {
uploadId, multipartId := splitIds(id)
// Get uploaded parts
parts, err := store.listAllParts(ctx, id)
_, parts, _, err := upload.getInternalInfo(ctx)
if err != nil {
return err
}
@ -660,10 +759,11 @@ func (upload s3Upload) FinishUpload(ctx context.Context) error {
return err
}
parts = []*s3.Part{
&s3.Part{
ETag: res.ETag,
PartNumber: aws.Int64(1),
parts = []*s3Part{
&s3Part{
etag: *res.ETag,
number: 1,
size: 0,
},
}
@ -675,8 +775,8 @@ func (upload s3Upload) FinishUpload(ctx context.Context) error {
for index, part := range parts {
completedParts[index] = &s3.CompletedPart{
ETag: part.ETag,
PartNumber: part.PartNumber,
ETag: aws.String(part.etag),
PartNumber: aws.Int64(part.number),
}
}
@ -790,10 +890,16 @@ func (upload *s3Upload) concatUsingMultipart(ctx context.Context, partialUploads
partialS3Upload := partialUpload.(*s3Upload)
partialId, _ := splitIds(partialS3Upload.id)
upload.parts = append(upload.parts, &s3Part{
number: int64(i + 1),
size: -1,
etag: "",
})
go func(i int, partialId string) {
defer wg.Done()
_, err := store.Service.UploadPartCopyWithContext(ctx, &s3.UploadPartCopyInput{
res, err := store.Service.UploadPartCopyWithContext(ctx, &s3.UploadPartCopyInput{
Bucket: aws.String(store.Bucket),
Key: store.keyWithPrefix(uploadId),
UploadId: aws.String(multipartId),
@ -806,6 +912,8 @@ func (upload *s3Upload) concatUsingMultipart(ctx context.Context, partialUploads
errs = append(errs, err)
return
}
upload.parts[i].etag = *res.CopyPartResult.ETag
}(i, partialId)
}
@ -829,7 +937,7 @@ func (upload *s3Upload) DeclareLength(ctx context.Context, length int64) error {
return upload.writeInfo(ctx, info)
}
func (store S3Store) listAllParts(ctx context.Context, id string) (parts []*s3.Part, err error) {
func (store S3Store) listAllParts(ctx context.Context, id string) (parts []*s3Part, err error) {
uploadId, multipartId := splitIds(id)
partMarker := int64(0)
@ -845,7 +953,14 @@ func (store S3Store) listAllParts(ctx context.Context, id string) (parts []*s3.P
return nil, err
}
parts = append(parts, (*listPtr).Parts...)
// TODO: Find more efficient way when appending many elements
for _, part := range (*listPtr).Parts {
parts = append(parts, &s3Part{
number: *part.PartNumber,
size: *part.Size,
etag: *part.ETag,
})
}
if listPtr.IsTruncated != nil && *listPtr.IsTruncated {
partMarker = *listPtr.NextPartNumberMarker
@ -856,36 +971,36 @@ func (store S3Store) listAllParts(ctx context.Context, id string) (parts []*s3.P
return parts, nil
}
func (store S3Store) downloadIncompletePartForUpload(ctx context.Context, uploadId string) (*os.File, int64, error) {
func (store S3Store) downloadIncompletePartForUpload(ctx context.Context, uploadId string) (*os.File, error) {
incompleteUploadObject, err := store.getIncompletePartForUpload(ctx, uploadId)
if err != nil {
return nil, 0, err
return nil, err
}
if incompleteUploadObject == nil {
// We did not find an incomplete upload
return nil, 0, nil
return nil, nil
}
defer incompleteUploadObject.Body.Close()
partFile, err := ioutil.TempFile(store.TemporaryDirectory, "tusd-s3-tmp-")
if err != nil {
return nil, 0, err
return nil, err
}
n, err := io.Copy(partFile, incompleteUploadObject.Body)
if err != nil {
return nil, 0, err
return nil, err
}
if n < *incompleteUploadObject.ContentLength {
return nil, 0, errors.New("short read of incomplete upload")
return nil, errors.New("short read of incomplete upload")
}
_, err = partFile.Seek(0, 0)
if err != nil {
return nil, 0, err
return nil, err
}
return partFile, n, nil
return partFile, nil
}
func (store S3Store) getIncompletePartForUpload(ctx context.Context, uploadId string) (*s3.GetObjectOutput, error) {
@ -901,6 +1016,22 @@ func (store S3Store) getIncompletePartForUpload(ctx context.Context, uploadId st
return obj, err
}
func (store S3Store) headIncompletePartForUpload(ctx context.Context, uploadId string) (int64, error) {
obj, err := store.Service.HeadObjectWithContext(ctx, &s3.HeadObjectInput{
Bucket: aws.String(store.Bucket),
Key: store.metadataKeyWithPrefix(uploadId + ".part"),
})
if err != nil {
if isAwsError(err, s3.ErrCodeNoSuchKey) || isAwsError(err, "NotFound") || isAwsError(err, "AccessDenied") {
err = nil
}
return 0, err
}
return *obj.ContentLength, nil
}
func (store S3Store) putIncompletePartForUpload(ctx context.Context, uploadId string, file *os.File) error {
defer cleanUpTempFile(file)

View File

@ -6,10 +6,11 @@ package s3store
import (
context "context"
reflect "reflect"
request "github.com/aws/aws-sdk-go/aws/request"
s3 "github.com/aws/aws-sdk-go/service/s3"
gomock "github.com/golang/mock/gomock"
reflect "reflect"
)
// MockS3API is a mock of S3API interface
@ -155,6 +156,26 @@ func (mr *MockS3APIMockRecorder) GetObjectWithContext(arg0, arg1 interface{}, ar
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetObjectWithContext", reflect.TypeOf((*MockS3API)(nil).GetObjectWithContext), varargs...)
}
// HeadObjectWithContext mocks base method
func (m *MockS3API) HeadObjectWithContext(arg0 context.Context, arg1 *s3.HeadObjectInput, arg2 ...request.Option) (*s3.HeadObjectOutput, error) {
m.ctrl.T.Helper()
varargs := []interface{}{arg0, arg1}
for _, a := range arg2 {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "HeadObjectWithContext", varargs...)
ret0, _ := ret[0].(*s3.HeadObjectOutput)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// HeadObjectWithContext indicates an expected call of HeadObjectWithContext
func (mr *MockS3APIMockRecorder) HeadObjectWithContext(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HeadObjectWithContext", reflect.TypeOf((*MockS3API)(nil).HeadObjectWithContext), varargs...)
}
// ListPartsWithContext mocks base method
func (m *MockS3API) ListPartsWithContext(arg0 context.Context, arg1 *s3.ListPartsInput, arg2 ...request.Option) (*s3.ListPartsOutput, error) {
m.ctrl.T.Helper()

View File

@ -8,45 +8,79 @@ import (
// s3PartProducer converts a stream of bytes from the reader into a stream of files on disk
type s3PartProducer struct {
store *S3Store
files chan<- *os.File
tmpDir string
files chan fileChunk
done chan struct{}
err error
r io.Reader
}
func (spp *s3PartProducer) produce(partSize int64) {
for {
file, err := spp.nextPart(partSize)
if err != nil {
spp.err = err
close(spp.files)
return
type fileChunk struct {
file *os.File
size int64
}
func newS3PartProducer(source io.Reader, backlog int64, tmpDir string) (s3PartProducer, <-chan fileChunk) {
fileChan := make(chan fileChunk, backlog)
doneChan := make(chan struct{})
partProducer := s3PartProducer{
tmpDir: tmpDir,
done: doneChan,
files: fileChan,
r: source,
}
if file == nil {
close(spp.files)
return
return partProducer, fileChan
}
// stop should always be called by the consumer to ensure that the channels
// are properly closed and emptied.
func (spp *s3PartProducer) stop() {
close(spp.done)
// If we return while there are still files in the channel, then
// we may leak file descriptors. Let's ensure that those are cleaned up.
for fileChunk := range spp.files {
cleanUpTempFile(fileChunk.file)
}
}
func (spp *s3PartProducer) produce(partSize int64) {
outerloop:
for {
file, ok, err := spp.nextPart(partSize)
if err != nil {
// An error occured. Stop producing.
spp.err = err
break
}
if !ok {
// The source was fully read. Stop producing.
break
}
select {
case spp.files <- file:
case <-spp.done:
// We are told to stop producing. Stop producing.
break outerloop
}
}
close(spp.files)
return
}
}
}
func (spp *s3PartProducer) nextPart(size int64) (*os.File, error) {
func (spp *s3PartProducer) nextPart(size int64) (fileChunk, bool, error) {
// Create a temporary file to store the part
file, err := ioutil.TempFile(spp.store.TemporaryDirectory, "tusd-s3-tmp-")
file, err := ioutil.TempFile(spp.tmpDir, "tusd-s3-tmp-")
if err != nil {
return nil, err
return fileChunk{}, false, err
}
limitedReader := io.LimitReader(spp.r, size)
n, err := io.Copy(file, limitedReader)
if err != nil {
return nil, err
return fileChunk{}, false, err
}
// If the entire request body is read and no more data is available,
@ -54,11 +88,14 @@ func (spp *s3PartProducer) nextPart(size int64) (*os.File, error) {
// case, we can close the s3PartProducer.
if n == 0 {
cleanUpTempFile(file)
return nil, nil
return fileChunk{}, false, nil
}
// Seek to the beginning of the file
file.Seek(0, 0)
return file, nil
return fileChunk{
file: file,
size: n,
}, true, nil
}

View File

@ -22,32 +22,28 @@ func (ErrorReader) Read(b []byte) (int, error) {
}
func TestPartProducerConsumesEntireReaderWithoutError(t *testing.T) {
fileChan := make(chan *os.File)
doneChan := make(chan struct{})
expectedStr := "test"
r := strings.NewReader(expectedStr)
pp := s3PartProducer{
store: &S3Store{},
done: doneChan,
files: fileChan,
r: r,
}
pp, fileChan := newS3PartProducer(r, 0, "")
go pp.produce(1)
actualStr := ""
b := make([]byte, 1)
for f := range fileChan {
n, err := f.Read(b)
for chunk := range fileChan {
n, err := chunk.file.Read(b)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if n != 1 {
t.Fatalf("incorrect number of bytes read: wanted %d, got %d", 1, n)
}
if chunk.size != 1 {
t.Fatalf("incorrect number of bytes in struct: wanted %d, got %d", 1, chunk.size)
}
actualStr += string(b)
os.Remove(f.Name())
f.Close()
os.Remove(chunk.file.Name())
chunk.file.Close()
}
if actualStr != expectedStr {
@ -59,15 +55,8 @@ func TestPartProducerConsumesEntireReaderWithoutError(t *testing.T) {
}
}
func TestPartProducerExitsWhenDoneChannelIsClosed(t *testing.T) {
fileChan := make(chan *os.File)
doneChan := make(chan struct{})
pp := s3PartProducer{
store: &S3Store{},
done: doneChan,
files: fileChan,
r: InfiniteZeroReader{},
}
func TestPartProducerExitsWhenProducerIsStopped(t *testing.T) {
pp, fileChan := newS3PartProducer(InfiniteZeroReader{}, 0, "")
completedChan := make(chan struct{})
go func() {
@ -75,35 +64,7 @@ func TestPartProducerExitsWhenDoneChannelIsClosed(t *testing.T) {
completedChan <- struct{}{}
}()
close(doneChan)
select {
case <-completedChan:
// producer exited cleanly
case <-time.After(2 * time.Second):
t.Error("timed out waiting for producer to exit")
}
safelyDrainChannelOrFail(fileChan, t)
}
func TestPartProducerExitsWhenDoneChannelIsClosedBeforeAnyPartIsSent(t *testing.T) {
fileChan := make(chan *os.File)
doneChan := make(chan struct{})
pp := s3PartProducer{
store: &S3Store{},
done: doneChan,
files: fileChan,
r: InfiniteZeroReader{},
}
close(doneChan)
completedChan := make(chan struct{})
go func() {
pp.produce(10)
completedChan <- struct{}{}
}()
pp.stop()
select {
case <-completedChan:
@ -116,14 +77,7 @@ func TestPartProducerExitsWhenDoneChannelIsClosedBeforeAnyPartIsSent(t *testing.
}
func TestPartProducerExitsWhenUnableToReadFromFile(t *testing.T) {
fileChan := make(chan *os.File)
doneChan := make(chan struct{})
pp := s3PartProducer{
store: &S3Store{},
done: doneChan,
files: fileChan,
r: ErrorReader{},
}
pp, fileChan := newS3PartProducer(ErrorReader{}, 0, "")
completedChan := make(chan struct{})
go func() {
@ -145,12 +99,12 @@ func TestPartProducerExitsWhenUnableToReadFromFile(t *testing.T) {
}
}
func safelyDrainChannelOrFail(c chan *os.File, t *testing.T) {
func safelyDrainChannelOrFail(c <-chan fileChunk, t *testing.T) {
// At this point, we've signaled that the producer should exit, but it may write a few files
// into the channel before closing it and exiting. Make sure that we get a nil value
// eventually.
for i := 0; i < 100; i++ {
if f := <-c; f == nil {
if _, more := <-c; !more {
return
}
}

View File

@ -171,6 +171,9 @@ func TestNewUploadWithMetadataObjectPrefix(t *testing.T) {
assert.NotNil(upload)
}
// This test ensures that an newly created upload without any chunks can be
// directly finished. There are no calls to ListPart or HeadObject because
// the upload is not fetched from S3 first.
func TestEmptyUpload(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
@ -193,14 +196,6 @@ func TestEmptyUpload(t *testing.T) {
Body: bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":0,"SizeIsDeferred":false,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"uploadId","Type":"s3store"}}`)),
ContentLength: aws.Int64(int64(208)),
}),
s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
PartNumberMarker: aws.Int64(0),
}).Return(&s3.ListPartsOutput{
Parts: []*s3.Part{},
}, nil),
s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
@ -272,6 +267,17 @@ func TestGetInfoNotFound(t *testing.T) {
Key: aws.String("uploadId.info"),
}).Return(nil, awserr.New("NoSuchKey", "The specified key does not exist.", nil))
s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
PartNumberMarker: aws.Int64(0),
}).Return(nil, awserr.New("NoSuchUpload", "Not found", nil))
s3obj.EXPECT().HeadObjectWithContext(context.Background(), &s3.HeadObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
}).Return(nil, awserr.New("NoSuchKey", "Not found", nil))
upload, err := store.GetUpload(context.Background(), "uploadId+multipartId")
assert.Nil(err)
@ -287,13 +293,12 @@ func TestGetInfo(t *testing.T) {
s3obj := NewMockS3API(mockCtrl)
store := New("bucket", s3obj)
gomock.InOrder(
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.info"),
}).Return(&s3.GetObjectOutput{
Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":500,"Offset":0,"MetaData":{"bar":"menü","foo":"hello"},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"my/uploaded/files/uploadId","Type":"s3store"}}`))),
}, nil),
}, nil)
s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
@ -302,15 +307,20 @@ func TestGetInfo(t *testing.T) {
}).Return(&s3.ListPartsOutput{
Parts: []*s3.Part{
{
PartNumber: aws.Int64(1),
Size: aws.Int64(100),
ETag: aws.String("etag-1"),
},
{
PartNumber: aws.Int64(2),
Size: aws.Int64(200),
ETag: aws.String("etag-2"),
},
},
NextPartNumberMarker: aws.Int64(2),
// Simulate a truncated response, so s3store should send a second request
IsTruncated: aws.Bool(true),
}, nil),
}, nil)
s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
@ -319,15 +329,16 @@ func TestGetInfo(t *testing.T) {
}).Return(&s3.ListPartsOutput{
Parts: []*s3.Part{
{
PartNumber: aws.Int64(3),
Size: aws.Int64(100),
ETag: aws.String("etag-3"),
},
},
}, nil),
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
}, nil)
s3obj.EXPECT().HeadObjectWithContext(context.Background(), &s3.HeadObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
}).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "Not found", nil)),
)
}).Return(nil, awserr.New("NoSuchKey", "Not found", nil))
upload, err := store.GetUpload(context.Background(), "uploadId+multipartId")
assert.Nil(err)
@ -353,13 +364,12 @@ func TestGetInfoWithMetadataObjectPrefix(t *testing.T) {
store := New("bucket", s3obj)
store.MetadataObjectPrefix = "my/metadata"
gomock.InOrder(
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("my/metadata/uploadId.info"),
}).Return(&s3.GetObjectOutput{
Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":500,"Offset":0,"MetaData":{"bar":"menü","foo":"hello"},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"my/uploaded/files/uploadId","Type":"s3store"}}`))),
}, nil),
}, nil)
s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
@ -368,15 +378,20 @@ func TestGetInfoWithMetadataObjectPrefix(t *testing.T) {
}).Return(&s3.ListPartsOutput{
Parts: []*s3.Part{
{
PartNumber: aws.Int64(1),
Size: aws.Int64(100),
ETag: aws.String("etag-1"),
},
{
PartNumber: aws.Int64(2),
Size: aws.Int64(200),
ETag: aws.String("etag-2"),
},
},
NextPartNumberMarker: aws.Int64(2),
// Simulate a truncated response, so s3store should send a second request
IsTruncated: aws.Bool(true),
}, nil),
}, nil)
s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
@ -385,15 +400,16 @@ func TestGetInfoWithMetadataObjectPrefix(t *testing.T) {
}).Return(&s3.ListPartsOutput{
Parts: []*s3.Part{
{
PartNumber: aws.Int64(3),
Size: aws.Int64(100),
ETag: aws.String("etag-3"),
},
},
}, nil),
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
}, nil)
s3obj.EXPECT().HeadObjectWithContext(context.Background(), &s3.HeadObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("my/metadata/uploadId.part"),
}).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "Not found", nil)),
)
}).Return(nil, awserr.New("NoSuchKey", "Not found", nil))
upload, err := store.GetUpload(context.Background(), "uploadId+multipartId")
assert.Nil(err)
@ -418,27 +434,24 @@ func TestGetInfoWithIncompletePart(t *testing.T) {
s3obj := NewMockS3API(mockCtrl)
store := New("bucket", s3obj)
gomock.InOrder(
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.info"),
}).Return(&s3.GetObjectOutput{
Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":500,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))),
}, nil),
}, nil)
s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
PartNumberMarker: aws.Int64(0),
}).Return(&s3.ListPartsOutput{Parts: []*s3.Part{}}, nil),
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
}).Return(&s3.ListPartsOutput{Parts: []*s3.Part{}}, nil)
s3obj.EXPECT().HeadObjectWithContext(context.Background(), &s3.HeadObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
}).Return(&s3.GetObjectOutput{
}).Return(&s3.HeadObjectOutput{
ContentLength: aws.Int64(10),
Body: ioutil.NopCloser(bytes.NewReader([]byte("0123456789"))),
}, nil),
)
}, nil)
upload, err := store.GetUpload(context.Background(), "uploadId+multipartId")
assert.Nil(err)
@ -457,20 +470,22 @@ func TestGetInfoFinished(t *testing.T) {
s3obj := NewMockS3API(mockCtrl)
store := New("bucket", s3obj)
gomock.InOrder(
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.info"),
}).Return(&s3.GetObjectOutput{
Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))),
}, nil),
}, nil)
s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
PartNumberMarker: aws.Int64(0),
}).Return(nil, awserr.New("NoSuchUpload", "The specified upload does not exist.", nil)),
)
}).Return(nil, awserr.New("NoSuchUpload", "The specified upload does not exist.", nil))
s3obj.EXPECT().HeadObjectWithContext(context.Background(), &s3.HeadObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
}).Return(nil, awserr.New("NoSuchKey", "Not found", nil))
upload, err := store.GetUpload(context.Background(), "uploadId+multipartId")
assert.Nil(err)
@ -572,13 +587,12 @@ func TestDeclareLength(t *testing.T) {
s3obj := NewMockS3API(mockCtrl)
store := New("bucket", s3obj)
gomock.InOrder(
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.info"),
}).Return(&s3.GetObjectOutput{
Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":0,"SizeIsDeferred":true,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"uploadId","Type":"s3store"}}`))),
}, nil),
}, nil)
s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
@ -586,18 +600,17 @@ func TestDeclareLength(t *testing.T) {
PartNumberMarker: aws.Int64(0),
}).Return(&s3.ListPartsOutput{
Parts: []*s3.Part{},
}, nil),
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
}, nil)
s3obj.EXPECT().HeadObjectWithContext(context.Background(), &s3.HeadObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
}).Return(nil, awserr.New("NotFound", "Not Found", nil)),
}).Return(nil, awserr.New("NotFound", "Not Found", nil))
s3obj.EXPECT().PutObjectWithContext(context.Background(), &s3.PutObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.info"),
Body: bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":500,"SizeIsDeferred":false,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"uploadId","Type":"s3store"}}`)),
ContentLength: aws.Int64(int64(208)),
}),
)
})
upload, err := store.GetUpload(context.Background(), "uploadId+multipartId")
assert.Nil(err)
@ -617,7 +630,12 @@ func TestFinishUpload(t *testing.T) {
s3obj := NewMockS3API(mockCtrl)
store := New("bucket", s3obj)
gomock.InOrder(
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.info"),
}).Return(&s3.GetObjectOutput{
Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":400,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))),
}, nil)
s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
@ -627,18 +645,18 @@ func TestFinishUpload(t *testing.T) {
Parts: []*s3.Part{
{
Size: aws.Int64(100),
ETag: aws.String("foo"),
ETag: aws.String("etag-1"),
PartNumber: aws.Int64(1),
},
{
Size: aws.Int64(200),
ETag: aws.String("bar"),
ETag: aws.String("etag-2"),
PartNumber: aws.Int64(2),
},
},
NextPartNumberMarker: aws.Int64(2),
IsTruncated: aws.Bool(true),
}, nil),
}, nil)
s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
@ -648,11 +666,15 @@ func TestFinishUpload(t *testing.T) {
Parts: []*s3.Part{
{
Size: aws.Int64(100),
ETag: aws.String("foobar"),
ETag: aws.String("etag-3"),
PartNumber: aws.Int64(3),
},
},
}, nil),
}, nil)
s3obj.EXPECT().HeadObjectWithContext(context.Background(), &s3.HeadObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
}).Return(nil, awserr.New("NotFound", "Not Found", nil))
s3obj.EXPECT().CompleteMultipartUploadWithContext(context.Background(), &s3.CompleteMultipartUploadInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
@ -660,21 +682,20 @@ func TestFinishUpload(t *testing.T) {
MultipartUpload: &s3.CompletedMultipartUpload{
Parts: []*s3.CompletedPart{
{
ETag: aws.String("foo"),
ETag: aws.String("etag-1"),
PartNumber: aws.Int64(1),
},
{
ETag: aws.String("bar"),
ETag: aws.String("etag-2"),
PartNumber: aws.Int64(2),
},
{
ETag: aws.String("foobar"),
ETag: aws.String("etag-3"),
PartNumber: aws.Int64(3),
},
},
},
}).Return(nil, nil),
)
}).Return(nil, nil)
upload, err := store.GetUpload(context.Background(), "uploadId+multipartId")
assert.Nil(err)
@ -696,6 +717,7 @@ func TestWriteChunk(t *testing.T) {
store.MaxMultipartParts = 10000
store.MaxObjectSize = 5 * 1024 * 1024 * 1024 * 1024
// From GetInfo
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.info"),
@ -711,49 +733,54 @@ func TestWriteChunk(t *testing.T) {
Parts: []*s3.Part{
{
Size: aws.Int64(100),
ETag: aws.String("etag-1"),
PartNumber: aws.Int64(1),
},
{
Size: aws.Int64(200),
ETag: aws.String("etag-2"),
PartNumber: aws.Int64(2),
},
},
}, nil).Times(2)
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
}, nil)
s3obj.EXPECT().HeadObjectWithContext(context.Background(), &s3.HeadObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
}).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "Not found", nil))
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
}).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist.", nil))
}).Return(nil, awserr.New("NoSuchKey", "Not found", nil))
gomock.InOrder(
// From WriteChunk
s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
PartNumber: aws.Int64(3),
Body: bytes.NewReader([]byte("1234")),
})).Return(nil, nil),
})).Return(&s3.UploadPartOutput{
ETag: aws.String("etag-3"),
}, nil)
s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
PartNumber: aws.Int64(4),
Body: bytes.NewReader([]byte("5678")),
})).Return(nil, nil),
})).Return(&s3.UploadPartOutput{
ETag: aws.String("etag-4"),
}, nil)
s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
PartNumber: aws.Int64(5),
Body: bytes.NewReader([]byte("90AB")),
})).Return(nil, nil),
})).Return(&s3.UploadPartOutput{
ETag: aws.String("etag-5"),
}, nil)
s3obj.EXPECT().PutObjectWithContext(context.Background(), NewPutObjectInputMatcher(&s3.PutObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
Body: bytes.NewReader([]byte("CD")),
})).Return(nil, nil),
)
})).Return(nil, nil)
upload, err := store.GetUpload(context.Background(), "uploadId+multipartId")
assert.Nil(err)
@ -786,28 +813,26 @@ func TestWriteChunkWriteIncompletePartBecauseTooSmall(t *testing.T) {
Parts: []*s3.Part{
{
Size: aws.Int64(100),
ETag: aws.String("etag-1"),
PartNumber: aws.Int64(1),
},
{
Size: aws.Int64(200),
ETag: aws.String("etag-2"),
PartNumber: aws.Int64(2),
},
},
}, nil).Times(2)
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
}, nil)
s3obj.EXPECT().HeadObjectWithContext(context.Background(), &s3.HeadObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
}).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist", nil))
}).Return(nil, awserr.New("NoSuchKey", "The specified key does not exist", nil))
gomock.InOrder(
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
}).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist.", nil)),
s3obj.EXPECT().PutObjectWithContext(context.Background(), NewPutObjectInputMatcher(&s3.PutObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
Body: bytes.NewReader([]byte("1234567890")),
})).Return(nil, nil),
)
})).Return(nil, nil)
upload, err := store.GetUpload(context.Background(), "uploadId+multipartId")
assert.Nil(err)
@ -836,12 +861,19 @@ func TestWriteChunkPrependsIncompletePart(t *testing.T) {
}).Return(&s3.GetObjectOutput{
Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":5,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))),
}, nil)
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
PartNumberMarker: aws.Int64(0),
}).Return(&s3.ListPartsOutput{
Parts: []*s3.Part{},
}, nil)
s3obj.EXPECT().HeadObjectWithContext(context.Background(), &s3.HeadObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
}).Return(&s3.GetObjectOutput{
}).Return(&s3.HeadObjectOutput{
ContentLength: aws.Int64(3),
Body: ioutil.NopCloser(bytes.NewReader([]byte("123"))),
}, nil)
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
Bucket: aws.String("bucket"),
@ -854,29 +886,25 @@ func TestWriteChunkPrependsIncompletePart(t *testing.T) {
Bucket: aws.String(store.Bucket),
Key: aws.String("uploadId.part"),
}).Return(&s3.DeleteObjectOutput{}, nil)
s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
PartNumberMarker: aws.Int64(0),
}).Return(&s3.ListPartsOutput{Parts: []*s3.Part{}}, nil).Times(2)
gomock.InOrder(
s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
PartNumber: aws.Int64(1),
Body: bytes.NewReader([]byte("1234")),
})).Return(nil, nil),
})).Return(&s3.UploadPartOutput{
ETag: aws.String("etag-1"),
}, nil)
s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
PartNumber: aws.Int64(2),
Body: bytes.NewReader([]byte("5")),
})).Return(nil, nil),
)
})).Return(&s3.UploadPartOutput{
ETag: aws.String("etag-2"),
}, nil)
upload, err := store.GetUpload(context.Background(), "uploadId+multipartId")
assert.Nil(err)
@ -910,33 +938,39 @@ func TestWriteChunkPrependsIncompletePartAndWritesANewIncompletePart(t *testing.
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
PartNumberMarker: aws.Int64(0),
}).Return(&s3.ListPartsOutput{Parts: []*s3.Part{}}, nil).Times(2)
}).Return(&s3.ListPartsOutput{Parts: []*s3.Part{}}, nil)
s3obj.EXPECT().HeadObjectWithContext(context.Background(), &s3.HeadObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
}).Return(&s3.HeadObjectOutput{
ContentLength: aws.Int64(3),
}, nil)
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
}).Return(&s3.GetObjectOutput{
ContentLength: aws.Int64(3),
Body: ioutil.NopCloser(bytes.NewReader([]byte("123"))),
}, nil).Times(2)
}, nil)
s3obj.EXPECT().DeleteObjectWithContext(context.Background(), &s3.DeleteObjectInput{
Bucket: aws.String(store.Bucket),
Key: aws.String("uploadId.part"),
}).Return(&s3.DeleteObjectOutput{}, nil)
gomock.InOrder(
s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
PartNumber: aws.Int64(1),
Body: bytes.NewReader([]byte("1234")),
})).Return(nil, nil),
})).Return(&s3.UploadPartOutput{
ETag: aws.String("etag-1"),
}, nil)
s3obj.EXPECT().PutObjectWithContext(context.Background(), NewPutObjectInputMatcher(&s3.PutObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
Body: bytes.NewReader([]byte("5")),
})).Return(nil, nil),
)
})).Return(nil, nil)
upload, err := store.GetUpload(context.Background(), "uploadId+multipartId")
assert.Nil(err)
@ -969,28 +1003,30 @@ func TestWriteChunkAllowTooSmallLast(t *testing.T) {
}).Return(&s3.ListPartsOutput{
Parts: []*s3.Part{
{
PartNumber: aws.Int64(1),
Size: aws.Int64(400),
ETag: aws.String("etag-1"),
},
{
PartNumber: aws.Int64(2),
Size: aws.Int64(90),
ETag: aws.String("etag-2"),
},
},
}, nil).Times(2)
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
}, nil)
s3obj.EXPECT().HeadObjectWithContext(context.Background(), &s3.HeadObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
}).Return(&s3.GetObjectOutput{}, awserr.New("AccessDenied", "Access Denied.", nil))
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
}).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist.", nil))
}).Return(nil, awserr.New("AccessDenied", "Access Denied.", nil))
s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
PartNumber: aws.Int64(3),
Body: bytes.NewReader([]byte("1234567890")),
})).Return(nil, nil)
})).Return(&s3.UploadPartOutput{
ETag: aws.String("etag-3"),
}, nil)
upload, err := store.GetUpload(context.Background(), "uploadId+multipartId")
assert.Nil(err)
@ -1101,13 +1137,33 @@ func TestConcatUploadsUsingMultipart(t *testing.T) {
store := New("bucket", s3obj)
store.MinPartSize = 100
// Calls from NewUpload
s3obj.EXPECT().CreateMultipartUploadWithContext(context.Background(), &s3.CreateMultipartUploadInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
Metadata: map[string]*string{},
}).Return(&s3.CreateMultipartUploadOutput{
UploadId: aws.String("multipartId"),
}, nil)
s3obj.EXPECT().PutObjectWithContext(context.Background(), &s3.PutObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.info"),
Body: bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":0,"SizeIsDeferred":false,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":true,"PartialUploads":["aaa+AAA","bbb+BBB","ccc+CCC"],"Storage":{"Bucket":"bucket","Key":"uploadId","Type":"s3store"}}`)),
ContentLength: aws.Int64(int64(234)),
})
// Calls from ConcatUploads
s3obj.EXPECT().UploadPartCopyWithContext(context.Background(), &s3.UploadPartCopyInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
CopySource: aws.String("bucket/aaa"),
PartNumber: aws.Int64(1),
}).Return(nil, nil)
}).Return(&s3.UploadPartCopyOutput{
CopyPartResult: &s3.CopyPartResult{
ETag: aws.String("etag-1"),
},
}, nil)
s3obj.EXPECT().UploadPartCopyWithContext(context.Background(), &s3.UploadPartCopyInput{
Bucket: aws.String("bucket"),
@ -1115,7 +1171,11 @@ func TestConcatUploadsUsingMultipart(t *testing.T) {
UploadId: aws.String("multipartId"),
CopySource: aws.String("bucket/bbb"),
PartNumber: aws.Int64(2),
}).Return(nil, nil)
}).Return(&s3.UploadPartCopyOutput{
CopyPartResult: &s3.CopyPartResult{
ETag: aws.String("etag-2"),
},
}, nil)
s3obj.EXPECT().UploadPartCopyWithContext(context.Background(), &s3.UploadPartCopyInput{
Bucket: aws.String("bucket"),
@ -1123,31 +1183,13 @@ func TestConcatUploadsUsingMultipart(t *testing.T) {
UploadId: aws.String("multipartId"),
CopySource: aws.String("bucket/ccc"),
PartNumber: aws.Int64(3),
}).Return(nil, nil)
}).Return(&s3.UploadPartCopyOutput{
CopyPartResult: &s3.CopyPartResult{
ETag: aws.String("etag-3"),
},
}, nil)
// Output from s3Store.FinishUpload
gomock.InOrder(
s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
PartNumberMarker: aws.Int64(0),
}).Return(&s3.ListPartsOutput{
Parts: []*s3.Part{
{
ETag: aws.String("foo"),
PartNumber: aws.Int64(1),
},
{
ETag: aws.String("bar"),
PartNumber: aws.Int64(2),
},
{
ETag: aws.String("baz"),
PartNumber: aws.Int64(3),
},
},
}, nil),
// Calls from FinishUpload
s3obj.EXPECT().CompleteMultipartUploadWithContext(context.Background(), &s3.CompleteMultipartUploadInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
@ -1155,23 +1197,31 @@ func TestConcatUploadsUsingMultipart(t *testing.T) {
MultipartUpload: &s3.CompletedMultipartUpload{
Parts: []*s3.CompletedPart{
{
ETag: aws.String("foo"),
ETag: aws.String("etag-1"),
PartNumber: aws.Int64(1),
},
{
ETag: aws.String("bar"),
ETag: aws.String("etag-2"),
PartNumber: aws.Int64(2),
},
{
ETag: aws.String("baz"),
ETag: aws.String("etag-3"),
PartNumber: aws.Int64(3),
},
},
},
}).Return(nil, nil),
)
}).Return(nil, nil)
upload, err := store.GetUpload(context.Background(), "uploadId+multipartId")
info := handler.FileInfo{
ID: "uploadId",
IsFinal: true,
PartialUploads: []string{
"aaa+AAA",
"bbb+BBB",
"ccc+CCC",
},
}
upload, err := store.NewUpload(context.Background(), info)
assert.Nil(err)
uploadA, err := store.GetUpload(context.Background(), "aaa+AAA")
@ -1269,13 +1319,13 @@ type s3APIWithTempFileAssertion struct {
func (s s3APIWithTempFileAssertion) UploadPartWithContext(context.Context, *s3.UploadPartInput, ...request.Option) (*s3.UploadPartOutput, error) {
assert := s.assert
// Make sure that only the two temporary files from tusd are in here.
// Make sure that there are temporary files from tusd in here.
files, err := ioutil.ReadDir(s.tempDir)
assert.Nil(err)
for _, file := range files {
assert.True(strings.HasPrefix(file.Name(), "tusd-s3-tmp-"))
}
assert.Equal(len(files), 2)
assert.True(len(files) > 0)
return nil, fmt.Errorf("not now")
}
@ -1316,7 +1366,7 @@ func TestWriteChunkCleansUpTempFiles(t *testing.T) {
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.info"),
}).Return(&s3.GetObjectOutput{
Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))),
Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":14,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))),
}, nil)
s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{
Bucket: aws.String("bucket"),
@ -1324,30 +1374,19 @@ func TestWriteChunkCleansUpTempFiles(t *testing.T) {
UploadId: aws.String("multipartId"),
PartNumberMarker: aws.Int64(0),
}).Return(&s3.ListPartsOutput{
Parts: []*s3.Part{
{
Size: aws.Int64(100),
},
{
Size: aws.Int64(200),
},
},
}, nil).Times(2)
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
Parts: []*s3.Part{},
}, nil)
s3obj.EXPECT().HeadObjectWithContext(context.Background(), &s3.HeadObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
}).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "Not found", nil))
s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
}).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist.", nil))
}).Return(nil, awserr.New("NoSuchKey", "Not found", nil))
// No calls to s3obj.EXPECT().UploadPartWithContext since that is handled by s3APIWithTempFileAssertion
upload, err := store.GetUpload(context.Background(), "uploadId+multipartId")
assert.Nil(err)
bytesRead, err := upload.WriteChunk(context.Background(), 300, bytes.NewReader([]byte("1234567890ABCD")))
bytesRead, err := upload.WriteChunk(context.Background(), 0, bytes.NewReader([]byte("1234567890ABCD")))
assert.NotNil(err)
assert.Equal(err.Error(), "not now")
assert.Equal(int64(0), bytesRead)