diff --git a/cmd/tusd/cli/flags.go b/cmd/tusd/cli/flags.go index fdf9da9..ee91392 100644 --- a/cmd/tusd/cli/flags.go +++ b/cmd/tusd/cli/flags.go @@ -27,6 +27,7 @@ var Flags struct { S3PartSize int64 S3DisableContentHashes bool S3DisableSSL bool + S3ConcurrentPartUploads int GCSBucket string GCSObjectPrefix string EnabledHooksString string @@ -68,6 +69,7 @@ func ParseFlags() { flag.Int64Var(&Flags.S3PartSize, "s3-part-size", 50*1024*1024, "Size in bytes of the individual upload requests made to the S3 API. Defaults to 50MiB (experimental and may be removed in the future)") flag.BoolVar(&Flags.S3DisableContentHashes, "s3-disable-content-hashes", false, "Disable the calculation of MD5 and SHA256 hashes for the content that gets uploaded to S3 for minimized CPU usage (experimental and may be removed in the future)") flag.BoolVar(&Flags.S3DisableSSL, "s3-disable-ssl", false, "Disable SSL and only use HTTP for communication with S3 (experimental and may be removed in the future)") + flag.IntVar(&Flags.S3ConcurrentPartUploads, "s3-concurrent-part-uploads", 10, "Number of concurrent part uploads to S3 (experimental and may be removed in the future)") flag.StringVar(&Flags.GCSBucket, "gcs-bucket", "", "Use Google Cloud Storage with this bucket as storage backend (requires the GCS_SERVICE_ACCOUNT_FILE environment variable to be set)") flag.StringVar(&Flags.GCSObjectPrefix, "gcs-object-prefix", "", "Prefix for GCS object names (can't contain underscore character)") flag.StringVar(&Flags.EnabledHooksString, "hooks-enabled-events", "pre-create,post-create,post-receive,post-terminate,post-finish", "Comma separated list of enabled hook events (e.g. post-create,post-finish). Leave empty to enable default events") diff --git a/internal/semaphore/semaphore.go b/internal/semaphore/semaphore.go new file mode 100644 index 0000000..3cf96fd --- /dev/null +++ b/internal/semaphore/semaphore.go @@ -0,0 +1,20 @@ +// Package semaphore implements a basic semaphore for coordinating and limiting +// non-exclusive, concurrent access. +package semaphore + +type Semaphore chan struct{} + +// New creates a semaphore with the given concurrency limit. +func New(concurrency int) Semaphore { + return make(chan struct{}, concurrency) +} + +// Acquire will block until the semaphore can be acquired. +func (s Semaphore) Acquire() { + s <- struct{}{} +} + +// Release frees the acquired slot in the semaphore. +func (s Semaphore) Release() { + <-s +} diff --git a/pkg/s3store/s3store.go b/pkg/s3store/s3store.go index 4d5e900..da4944c 100644 --- a/pkg/s3store/s3store.go +++ b/pkg/s3store/s3store.go @@ -83,6 +83,7 @@ import ( "sync" "time" + "github.com/tus/tusd/internal/semaphore" "github.com/tus/tusd/internal/uid" "github.com/tus/tusd/pkg/handler" @@ -158,6 +159,9 @@ type S3Store struct { // CPU, so it might be desirable to disable them. // Note that this property is experimental and might be removed in the future! DisableContentHashes bool + + // uploadSemaphore limits the number of concurrent multipart part uploads to S3. + uploadSemaphore semaphore.Semaphore } type S3API interface { @@ -165,6 +169,7 @@ type S3API interface { ListPartsWithContext(ctx context.Context, input *s3.ListPartsInput, opt ...request.Option) (*s3.ListPartsOutput, error) UploadPartWithContext(ctx context.Context, input *s3.UploadPartInput, opt ...request.Option) (*s3.UploadPartOutput, error) GetObjectWithContext(ctx context.Context, input *s3.GetObjectInput, opt ...request.Option) (*s3.GetObjectOutput, error) + HeadObjectWithContext(ctx context.Context, input *s3.HeadObjectInput, opt ...request.Option) (*s3.HeadObjectOutput, error) CreateMultipartUploadWithContext(ctx context.Context, input *s3.CreateMultipartUploadInput, opt ...request.Option) (*s3.CreateMultipartUploadOutput, error) AbortMultipartUploadWithContext(ctx context.Context, input *s3.AbortMultipartUploadInput, opt ...request.Option) (*s3.AbortMultipartUploadOutput, error) DeleteObjectWithContext(ctx context.Context, input *s3.DeleteObjectInput, opt ...request.Option) (*s3.DeleteObjectOutput, error) @@ -189,9 +194,15 @@ func New(bucket string, service S3API) S3Store { MaxObjectSize: 5 * 1024 * 1024 * 1024 * 1024, MaxBufferedParts: 20, TemporaryDirectory: "", + uploadSemaphore: semaphore.New(10), } } +// S3ConcurrentPartUploads changes the limit on how many concurrent part uploads to S3 are allowed. +func (store *S3Store) S3ConcurrentPartUploads(limit int) { + store.uploadSemaphore = semaphore.New(limit) +} + // UseIn sets this store as the core data store in the passed composer and adds // all possible extension to it. func (store S3Store) UseIn(composer *handler.StoreComposer) { @@ -209,6 +220,18 @@ type s3Upload struct { // been fetched yet from S3. Never read or write to it directly but instead use // the GetInfo and writeInfo functions. info *handler.FileInfo + + // parts collects all parts for this upload. It will be nil if info is nil as well. + parts []*s3Part + // incompletePartSize is the size of an incomplete part object, if one exists. It will be 0 if info is nil as well. + incompletePartSize int64 +} + +// s3Part represents a single part of a S3 multipart upload. +type s3Part struct { + number int64 + size int64 + etag string } func (store S3Store) NewUpload(ctx context.Context, info handler.FileInfo) (handler.Upload, error) { @@ -253,7 +276,7 @@ func (store S3Store) NewUpload(ctx context.Context, info handler.FileInfo) (hand "Key": *store.keyWithPrefix(uploadId), } - upload := &s3Upload{id, &store, nil} + upload := &s3Upload{id, &store, nil, []*s3Part{}, 0} err = upload.writeInfo(ctx, info) if err != nil { return nil, fmt.Errorf("s3store: unable to create info file:\n%s", err) @@ -263,7 +286,7 @@ func (store S3Store) NewUpload(ctx context.Context, info handler.FileInfo) (hand } func (store S3Store) GetUpload(ctx context.Context, id string) (handler.Upload, error) { - return &s3Upload{id, &store, nil}, nil + return &s3Upload{id, &store, nil, []*s3Part{}, 0}, nil } func (store S3Store) AsTerminatableUpload(upload handler.Upload) handler.TerminatableUpload { @@ -302,14 +325,60 @@ func (upload *s3Upload) writeInfo(ctx context.Context, info handler.FileInfo) er return err } -func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) { +func (upload *s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) { + id := upload.id + store := upload.store + + uploadId, _ := splitIds(id) + + // Get the total size of the current upload, number of parts to generate next number and whether + // an incomplete part exists + _, _, incompletePartSize, err := upload.getInternalInfo(ctx) + if err != nil { + return 0, err + } + + if incompletePartSize > 0 { + incompletePartFile, err := store.downloadIncompletePartForUpload(ctx, uploadId) + if err != nil { + return 0, err + } + if incompletePartFile == nil { + return 0, fmt.Errorf("s3store: Expected an incomplete part file but did not get any") + } + defer cleanUpTempFile(incompletePartFile) + + if err := store.deleteIncompletePartForUpload(ctx, uploadId); err != nil { + return 0, err + } + + // Prepend an incomplete part, if necessary and adapt the offset + src = io.MultiReader(incompletePartFile, src) + offset = offset - incompletePartSize + } + + bytesUploaded, err := upload.uploadParts(ctx, offset, src) + + // The size of the incomplete part should not be counted, because the + // process of the incomplete part should be fully transparent to the user. + bytesUploaded = bytesUploaded - incompletePartSize + if bytesUploaded < 0 { + bytesUploaded = 0 + } + + upload.info.Offset += bytesUploaded + + return bytesUploaded, err +} + +func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Reader) (int64, error) { id := upload.id store := upload.store uploadId, multipartId := splitIds(id) - // Get the total size of the current upload - info, err := upload.GetInfo(ctx) + // Get the total size of the current upload and number of parts to generate next number + info, parts, _, err := upload.getInternalInfo(ctx) if err != nil { return 0, err } @@ -321,83 +390,77 @@ func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Read return 0, err } - // Get number of parts to generate next number - parts, err := store.listAllParts(ctx, id) - if err != nil { - return 0, err - } - numParts := len(parts) nextPartNum := int64(numParts + 1) - incompletePartFile, incompletePartSize, err := store.downloadIncompletePartForUpload(ctx, uploadId) - if err != nil { - return 0, err - } - if incompletePartFile != nil { - defer cleanUpTempFile(incompletePartFile) - - if err := store.deleteIncompletePartForUpload(ctx, uploadId); err != nil { - return 0, err - } - - src = io.MultiReader(incompletePartFile, src) - } - - fileChan := make(chan *os.File, store.MaxBufferedParts) - doneChan := make(chan struct{}) - defer close(doneChan) - - // If we panic or return while there are still files in the channel, then - // we may leak file descriptors. Let's ensure that those are cleaned up. - defer func() { - for file := range fileChan { - cleanUpTempFile(file) - } - }() - - partProducer := s3PartProducer{ - store: store, - done: doneChan, - files: fileChan, - r: src, - } + partProducer, fileChan := newS3PartProducer(src, store.MaxBufferedParts, store.TemporaryDirectory) + defer partProducer.stop() go partProducer.produce(optimalPartSize) - for file := range fileChan { - stat, err := file.Stat() - if err != nil { - return 0, err - } - n := stat.Size() + var wg sync.WaitGroup + var uploadErr error - isFinalChunk := !info.SizeIsDeferred && (size == (offset-incompletePartSize)+n) - if n >= store.MinPartSize || isFinalChunk { - uploadPartInput := &s3.UploadPartInput{ - Bucket: aws.String(store.Bucket), - Key: store.keyWithPrefix(uploadId), - UploadId: aws.String(multipartId), - PartNumber: aws.Int64(nextPartNum), - } - if err := upload.putPartForUpload(ctx, uploadPartInput, file, n); err != nil { - return bytesUploaded, err + for fileChunk := range fileChan { + partfile := fileChunk.file + partsize := fileChunk.size + + isFinalChunk := !info.SizeIsDeferred && (size == offset+bytesUploaded+partsize) + if partsize >= store.MinPartSize || isFinalChunk { + part := &s3Part{ + etag: "", + size: partsize, + number: nextPartNum, } + upload.parts = append(upload.parts, part) + + wg.Add(1) + // We acquire the semaphore before starting the goroutine to avoid + // starting many goroutines, most of which are just waiting for the lock. + upload.store.uploadSemaphore.Acquire() + go func(file *os.File, part *s3Part) { + defer upload.store.uploadSemaphore.Release() + defer wg.Done() + + uploadPartInput := &s3.UploadPartInput{ + Bucket: aws.String(store.Bucket), + Key: store.keyWithPrefix(uploadId), + UploadId: aws.String(multipartId), + PartNumber: aws.Int64(part.number), + } + etag, err := upload.putPartForUpload(ctx, uploadPartInput, file, part.size) + if err != nil { + uploadErr = err + } else { + part.etag = etag + } + }(partfile, part) } else { - if err := store.putIncompletePartForUpload(ctx, uploadId, file); err != nil { - return bytesUploaded, err - } + wg.Add(1) + // We acquire the semaphore before starting the goroutine to avoid + // starting many goroutines, most of which are just waiting for the lock. + upload.store.uploadSemaphore.Acquire() + go func(file *os.File) { + defer upload.store.uploadSemaphore.Release() + defer wg.Done() - bytesUploaded += n - - return (bytesUploaded - incompletePartSize), nil + if err := store.putIncompletePartForUpload(ctx, uploadId, file); err != nil { + uploadErr = err + } + upload.incompletePartSize = partsize + }(partfile) } - offset += n - bytesUploaded += n + bytesUploaded += partsize nextPartNum += 1 } - return bytesUploaded - incompletePartSize, partProducer.err + wg.Wait() + + if uploadErr != nil { + return 0, uploadErr + } + + return bytesUploaded, partProducer.err } func cleanUpTempFile(file *os.File) { @@ -405,14 +468,17 @@ func cleanUpTempFile(file *os.File) { os.Remove(file.Name()) } -func (upload *s3Upload) putPartForUpload(ctx context.Context, uploadPartInput *s3.UploadPartInput, file *os.File, size int64) error { +func (upload *s3Upload) putPartForUpload(ctx context.Context, uploadPartInput *s3.UploadPartInput, file *os.File, size int64) (string, error) { defer cleanUpTempFile(file) if !upload.store.DisableContentHashes { // By default, use the traditional approach to upload data uploadPartInput.Body = file - _, err := upload.store.Service.UploadPartWithContext(ctx, uploadPartInput) - return err + res, err := upload.store.Service.UploadPartWithContext(ctx, uploadPartInput) + if err != nil { + return "", err + } + return *res.ETag, nil } else { // Experimental feature to prevent the AWS SDK from calculating the SHA256 hash // for the parts we upload to S3. @@ -420,19 +486,19 @@ func (upload *s3Upload) putPartForUpload(ctx context.Context, uploadPartInput *s // on our own. This way, the body is not included in the SHA256 calculation. s3api, ok := upload.store.Service.(s3APIForPresigning) if !ok { - return fmt.Errorf("s3store: failed to cast S3 service for presigning") + return "", fmt.Errorf("s3store: failed to cast S3 service for presigning") } s3Req, _ := s3api.UploadPartRequest(uploadPartInput) url, err := s3Req.Presign(15 * time.Minute) if err != nil { - return err + return "", err } req, err := http.NewRequest("PUT", url, file) if err != nil { - return err + return "", err } // Set the Content-Length manually to prevent the usage of Transfer-Encoding: chunked, @@ -441,89 +507,122 @@ func (upload *s3Upload) putPartForUpload(ctx context.Context, uploadPartInput *s res, err := http.DefaultClient.Do(req) if err != nil { - return err + return "", err } defer res.Body.Close() if res.StatusCode != 200 { buf := new(strings.Builder) io.Copy(buf, res.Body) - return fmt.Errorf("s3store: unexpected response code %d for presigned upload: %s", res.StatusCode, buf.String()) + return "", fmt.Errorf("s3store: unexpected response code %d for presigned upload: %s", res.StatusCode, buf.String()) } - return nil + return res.Header.Get("ETag"), nil } } func (upload *s3Upload) GetInfo(ctx context.Context) (info handler.FileInfo, err error) { + info, _, _, err = upload.getInternalInfo(ctx) + return info, err +} + +func (upload *s3Upload) getInternalInfo(ctx context.Context) (info handler.FileInfo, parts []*s3Part, incompletePartSize int64, err error) { if upload.info != nil { - return *upload.info, nil + return *upload.info, upload.parts, upload.incompletePartSize, nil } - info, err = upload.fetchInfo(ctx) + info, parts, incompletePartSize, err = upload.fetchInfo(ctx) if err != nil { - return info, err + return info, parts, incompletePartSize, err } upload.info = &info - return info, nil + upload.parts = parts + upload.incompletePartSize = incompletePartSize + return info, parts, incompletePartSize, nil } -func (upload s3Upload) fetchInfo(ctx context.Context) (info handler.FileInfo, err error) { +func (upload s3Upload) fetchInfo(ctx context.Context) (info handler.FileInfo, parts []*s3Part, incompletePartSize int64, err error) { id := upload.id store := upload.store uploadId, _ := splitIds(id) - // Get file info stored in separate object - res, err := store.Service.GetObjectWithContext(ctx, &s3.GetObjectInput{ - Bucket: aws.String(store.Bucket), - Key: store.metadataKeyWithPrefix(uploadId + ".info"), - }) - if err != nil { - if isAwsError(err, "NoSuchKey") { - return info, handler.ErrNotFound + var wg sync.WaitGroup + wg.Add(3) + + // We store all errors in here and handle them all together once the wait + // group is done. + var infoErr error + var partsErr error + var incompletePartSizeErr error + + go func() { + defer wg.Done() + + // Get file info stored in separate object + var res *s3.GetObjectOutput + res, infoErr = store.Service.GetObjectWithContext(ctx, &s3.GetObjectInput{ + Bucket: aws.String(store.Bucket), + Key: store.metadataKeyWithPrefix(uploadId + ".info"), + }) + if infoErr == nil { + infoErr = json.NewDecoder(res.Body).Decode(&info) } + }() - return info, err + go func() { + defer wg.Done() + + // Get uploaded parts and their offset + parts, partsErr = store.listAllParts(ctx, id) + }() + + go func() { + defer wg.Done() + + // Get size of optional incomplete part file. + incompletePartSize, incompletePartSizeErr = store.headIncompletePartForUpload(ctx, uploadId) + }() + + wg.Wait() + + // Finally, after all requests are complete, let's handle the errors + if infoErr != nil { + err = infoErr + // If the info file is not found, we consider the upload to be non-existant + if isAwsError(err, "NoSuchKey") { + err = handler.ErrNotFound + } + return } - if err := json.NewDecoder(res.Body).Decode(&info); err != nil { - return info, err - } - - // Get uploaded parts and their offset - parts, err := store.listAllParts(ctx, id) - if err != nil { - // Check if the error is caused by the upload not being found. This happens + if partsErr != nil { + err = partsErr + // Check if the error is caused by the multipart upload not being found. This happens // when the multipart upload has already been completed or aborted. Since // we already found the info object, we know that the upload has been // completed and therefore can ensure the the offset is the size. if isAwsError(err, "NoSuchUpload") { info.Offset = info.Size - return info, nil - } else { - return info, err + err = nil } + return } - offset := int64(0) + if incompletePartSizeErr != nil { + err = incompletePartSizeErr + return + } + // The offset is the sum of all part sizes and the size of the incomplete part file. + offset := incompletePartSize for _, part := range parts { - offset += *part.Size - } - - incompletePartObject, err := store.getIncompletePartForUpload(ctx, uploadId) - if err != nil { - return info, err - } - if incompletePartObject != nil { - defer incompletePartObject.Body.Close() - offset += *incompletePartObject.ContentLength + offset += part.size } info.Offset = offset - return + return info, parts, incompletePartSize, nil } func (upload s3Upload) GetReader(ctx context.Context) (io.Reader, error) { @@ -640,7 +739,7 @@ func (upload s3Upload) FinishUpload(ctx context.Context) error { uploadId, multipartId := splitIds(id) // Get uploaded parts - parts, err := store.listAllParts(ctx, id) + _, parts, _, err := upload.getInternalInfo(ctx) if err != nil { return err } @@ -660,10 +759,11 @@ func (upload s3Upload) FinishUpload(ctx context.Context) error { return err } - parts = []*s3.Part{ - &s3.Part{ - ETag: res.ETag, - PartNumber: aws.Int64(1), + parts = []*s3Part{ + &s3Part{ + etag: *res.ETag, + number: 1, + size: 0, }, } @@ -675,8 +775,8 @@ func (upload s3Upload) FinishUpload(ctx context.Context) error { for index, part := range parts { completedParts[index] = &s3.CompletedPart{ - ETag: part.ETag, - PartNumber: part.PartNumber, + ETag: aws.String(part.etag), + PartNumber: aws.Int64(part.number), } } @@ -790,10 +890,16 @@ func (upload *s3Upload) concatUsingMultipart(ctx context.Context, partialUploads partialS3Upload := partialUpload.(*s3Upload) partialId, _ := splitIds(partialS3Upload.id) + upload.parts = append(upload.parts, &s3Part{ + number: int64(i + 1), + size: -1, + etag: "", + }) + go func(i int, partialId string) { defer wg.Done() - _, err := store.Service.UploadPartCopyWithContext(ctx, &s3.UploadPartCopyInput{ + res, err := store.Service.UploadPartCopyWithContext(ctx, &s3.UploadPartCopyInput{ Bucket: aws.String(store.Bucket), Key: store.keyWithPrefix(uploadId), UploadId: aws.String(multipartId), @@ -806,6 +912,8 @@ func (upload *s3Upload) concatUsingMultipart(ctx context.Context, partialUploads errs = append(errs, err) return } + + upload.parts[i].etag = *res.CopyPartResult.ETag }(i, partialId) } @@ -829,7 +937,7 @@ func (upload *s3Upload) DeclareLength(ctx context.Context, length int64) error { return upload.writeInfo(ctx, info) } -func (store S3Store) listAllParts(ctx context.Context, id string) (parts []*s3.Part, err error) { +func (store S3Store) listAllParts(ctx context.Context, id string) (parts []*s3Part, err error) { uploadId, multipartId := splitIds(id) partMarker := int64(0) @@ -845,7 +953,14 @@ func (store S3Store) listAllParts(ctx context.Context, id string) (parts []*s3.P return nil, err } - parts = append(parts, (*listPtr).Parts...) + // TODO: Find more efficient way when appending many elements + for _, part := range (*listPtr).Parts { + parts = append(parts, &s3Part{ + number: *part.PartNumber, + size: *part.Size, + etag: *part.ETag, + }) + } if listPtr.IsTruncated != nil && *listPtr.IsTruncated { partMarker = *listPtr.NextPartNumberMarker @@ -856,36 +971,36 @@ func (store S3Store) listAllParts(ctx context.Context, id string) (parts []*s3.P return parts, nil } -func (store S3Store) downloadIncompletePartForUpload(ctx context.Context, uploadId string) (*os.File, int64, error) { +func (store S3Store) downloadIncompletePartForUpload(ctx context.Context, uploadId string) (*os.File, error) { incompleteUploadObject, err := store.getIncompletePartForUpload(ctx, uploadId) if err != nil { - return nil, 0, err + return nil, err } if incompleteUploadObject == nil { // We did not find an incomplete upload - return nil, 0, nil + return nil, nil } defer incompleteUploadObject.Body.Close() partFile, err := ioutil.TempFile(store.TemporaryDirectory, "tusd-s3-tmp-") if err != nil { - return nil, 0, err + return nil, err } n, err := io.Copy(partFile, incompleteUploadObject.Body) if err != nil { - return nil, 0, err + return nil, err } if n < *incompleteUploadObject.ContentLength { - return nil, 0, errors.New("short read of incomplete upload") + return nil, errors.New("short read of incomplete upload") } _, err = partFile.Seek(0, 0) if err != nil { - return nil, 0, err + return nil, err } - return partFile, n, nil + return partFile, nil } func (store S3Store) getIncompletePartForUpload(ctx context.Context, uploadId string) (*s3.GetObjectOutput, error) { @@ -901,6 +1016,22 @@ func (store S3Store) getIncompletePartForUpload(ctx context.Context, uploadId st return obj, err } +func (store S3Store) headIncompletePartForUpload(ctx context.Context, uploadId string) (int64, error) { + obj, err := store.Service.HeadObjectWithContext(ctx, &s3.HeadObjectInput{ + Bucket: aws.String(store.Bucket), + Key: store.metadataKeyWithPrefix(uploadId + ".part"), + }) + + if err != nil { + if isAwsError(err, s3.ErrCodeNoSuchKey) || isAwsError(err, "NotFound") || isAwsError(err, "AccessDenied") { + err = nil + } + return 0, err + } + + return *obj.ContentLength, nil +} + func (store S3Store) putIncompletePartForUpload(ctx context.Context, uploadId string, file *os.File) error { defer cleanUpTempFile(file) diff --git a/pkg/s3store/s3store_mock_test.go b/pkg/s3store/s3store_mock_test.go index 5b5b934..3d2af90 100644 --- a/pkg/s3store/s3store_mock_test.go +++ b/pkg/s3store/s3store_mock_test.go @@ -6,10 +6,11 @@ package s3store import ( context "context" + reflect "reflect" + request "github.com/aws/aws-sdk-go/aws/request" s3 "github.com/aws/aws-sdk-go/service/s3" gomock "github.com/golang/mock/gomock" - reflect "reflect" ) // MockS3API is a mock of S3API interface @@ -155,6 +156,26 @@ func (mr *MockS3APIMockRecorder) GetObjectWithContext(arg0, arg1 interface{}, ar return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetObjectWithContext", reflect.TypeOf((*MockS3API)(nil).GetObjectWithContext), varargs...) } +// HeadObjectWithContext mocks base method +func (m *MockS3API) HeadObjectWithContext(arg0 context.Context, arg1 *s3.HeadObjectInput, arg2 ...request.Option) (*s3.HeadObjectOutput, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "HeadObjectWithContext", varargs...) + ret0, _ := ret[0].(*s3.HeadObjectOutput) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// HeadObjectWithContext indicates an expected call of HeadObjectWithContext +func (mr *MockS3APIMockRecorder) HeadObjectWithContext(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HeadObjectWithContext", reflect.TypeOf((*MockS3API)(nil).HeadObjectWithContext), varargs...) +} + // ListPartsWithContext mocks base method func (m *MockS3API) ListPartsWithContext(arg0 context.Context, arg1 *s3.ListPartsInput, arg2 ...request.Option) (*s3.ListPartsOutput, error) { m.ctrl.T.Helper() diff --git a/pkg/s3store/s3store_part_producer.go b/pkg/s3store/s3store_part_producer.go index 80b3f85..94d1a0e 100644 --- a/pkg/s3store/s3store_part_producer.go +++ b/pkg/s3store/s3store_part_producer.go @@ -8,45 +8,79 @@ import ( // s3PartProducer converts a stream of bytes from the reader into a stream of files on disk type s3PartProducer struct { - store *S3Store - files chan<- *os.File - done chan struct{} - err error - r io.Reader + tmpDir string + files chan fileChunk + done chan struct{} + err error + r io.Reader +} + +type fileChunk struct { + file *os.File + size int64 +} + +func newS3PartProducer(source io.Reader, backlog int64, tmpDir string) (s3PartProducer, <-chan fileChunk) { + fileChan := make(chan fileChunk, backlog) + doneChan := make(chan struct{}) + + partProducer := s3PartProducer{ + tmpDir: tmpDir, + done: doneChan, + files: fileChan, + r: source, + } + + return partProducer, fileChan +} + +// stop should always be called by the consumer to ensure that the channels +// are properly closed and emptied. +func (spp *s3PartProducer) stop() { + close(spp.done) + + // If we return while there are still files in the channel, then + // we may leak file descriptors. Let's ensure that those are cleaned up. + for fileChunk := range spp.files { + cleanUpTempFile(fileChunk.file) + } } func (spp *s3PartProducer) produce(partSize int64) { +outerloop: for { - file, err := spp.nextPart(partSize) + file, ok, err := spp.nextPart(partSize) if err != nil { + // An error occured. Stop producing. spp.err = err - close(spp.files) - return + break } - if file == nil { - close(spp.files) - return + if !ok { + // The source was fully read. Stop producing. + break } select { case spp.files <- file: case <-spp.done: - close(spp.files) - return + // We are told to stop producing. Stop producing. + break outerloop } } + + close(spp.files) } -func (spp *s3PartProducer) nextPart(size int64) (*os.File, error) { +func (spp *s3PartProducer) nextPart(size int64) (fileChunk, bool, error) { // Create a temporary file to store the part - file, err := ioutil.TempFile(spp.store.TemporaryDirectory, "tusd-s3-tmp-") + file, err := ioutil.TempFile(spp.tmpDir, "tusd-s3-tmp-") if err != nil { - return nil, err + return fileChunk{}, false, err } limitedReader := io.LimitReader(spp.r, size) n, err := io.Copy(file, limitedReader) if err != nil { - return nil, err + return fileChunk{}, false, err } // If the entire request body is read and no more data is available, @@ -54,11 +88,14 @@ func (spp *s3PartProducer) nextPart(size int64) (*os.File, error) { // case, we can close the s3PartProducer. if n == 0 { cleanUpTempFile(file) - return nil, nil + return fileChunk{}, false, nil } // Seek to the beginning of the file file.Seek(0, 0) - return file, nil + return fileChunk{ + file: file, + size: n, + }, true, nil } diff --git a/pkg/s3store/s3store_part_producer_test.go b/pkg/s3store/s3store_part_producer_test.go index ad7ad2f..1bf9f0e 100644 --- a/pkg/s3store/s3store_part_producer_test.go +++ b/pkg/s3store/s3store_part_producer_test.go @@ -22,32 +22,28 @@ func (ErrorReader) Read(b []byte) (int, error) { } func TestPartProducerConsumesEntireReaderWithoutError(t *testing.T) { - fileChan := make(chan *os.File) - doneChan := make(chan struct{}) expectedStr := "test" r := strings.NewReader(expectedStr) - pp := s3PartProducer{ - store: &S3Store{}, - done: doneChan, - files: fileChan, - r: r, - } + pp, fileChan := newS3PartProducer(r, 0, "") go pp.produce(1) actualStr := "" b := make([]byte, 1) - for f := range fileChan { - n, err := f.Read(b) + for chunk := range fileChan { + n, err := chunk.file.Read(b) if err != nil { t.Fatalf("unexpected error: %s", err) } if n != 1 { t.Fatalf("incorrect number of bytes read: wanted %d, got %d", 1, n) } + if chunk.size != 1 { + t.Fatalf("incorrect number of bytes in struct: wanted %d, got %d", 1, chunk.size) + } actualStr += string(b) - os.Remove(f.Name()) - f.Close() + os.Remove(chunk.file.Name()) + chunk.file.Close() } if actualStr != expectedStr { @@ -59,15 +55,8 @@ func TestPartProducerConsumesEntireReaderWithoutError(t *testing.T) { } } -func TestPartProducerExitsWhenDoneChannelIsClosed(t *testing.T) { - fileChan := make(chan *os.File) - doneChan := make(chan struct{}) - pp := s3PartProducer{ - store: &S3Store{}, - done: doneChan, - files: fileChan, - r: InfiniteZeroReader{}, - } +func TestPartProducerExitsWhenProducerIsStopped(t *testing.T) { + pp, fileChan := newS3PartProducer(InfiniteZeroReader{}, 0, "") completedChan := make(chan struct{}) go func() { @@ -75,35 +64,7 @@ func TestPartProducerExitsWhenDoneChannelIsClosed(t *testing.T) { completedChan <- struct{}{} }() - close(doneChan) - - select { - case <-completedChan: - // producer exited cleanly - case <-time.After(2 * time.Second): - t.Error("timed out waiting for producer to exit") - } - - safelyDrainChannelOrFail(fileChan, t) -} - -func TestPartProducerExitsWhenDoneChannelIsClosedBeforeAnyPartIsSent(t *testing.T) { - fileChan := make(chan *os.File) - doneChan := make(chan struct{}) - pp := s3PartProducer{ - store: &S3Store{}, - done: doneChan, - files: fileChan, - r: InfiniteZeroReader{}, - } - - close(doneChan) - - completedChan := make(chan struct{}) - go func() { - pp.produce(10) - completedChan <- struct{}{} - }() + pp.stop() select { case <-completedChan: @@ -116,14 +77,7 @@ func TestPartProducerExitsWhenDoneChannelIsClosedBeforeAnyPartIsSent(t *testing. } func TestPartProducerExitsWhenUnableToReadFromFile(t *testing.T) { - fileChan := make(chan *os.File) - doneChan := make(chan struct{}) - pp := s3PartProducer{ - store: &S3Store{}, - done: doneChan, - files: fileChan, - r: ErrorReader{}, - } + pp, fileChan := newS3PartProducer(ErrorReader{}, 0, "") completedChan := make(chan struct{}) go func() { @@ -145,12 +99,12 @@ func TestPartProducerExitsWhenUnableToReadFromFile(t *testing.T) { } } -func safelyDrainChannelOrFail(c chan *os.File, t *testing.T) { +func safelyDrainChannelOrFail(c <-chan fileChunk, t *testing.T) { // At this point, we've signaled that the producer should exit, but it may write a few files // into the channel before closing it and exiting. Make sure that we get a nil value // eventually. for i := 0; i < 100; i++ { - if f := <-c; f == nil { + if _, more := <-c; !more { return } } diff --git a/pkg/s3store/s3store_test.go b/pkg/s3store/s3store_test.go index 88ec09a..cc78563 100644 --- a/pkg/s3store/s3store_test.go +++ b/pkg/s3store/s3store_test.go @@ -171,6 +171,9 @@ func TestNewUploadWithMetadataObjectPrefix(t *testing.T) { assert.NotNil(upload) } +// This test ensures that an newly created upload without any chunks can be +// directly finished. There are no calls to ListPart or HeadObject because +// the upload is not fetched from S3 first. func TestEmptyUpload(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() @@ -193,14 +196,6 @@ func TestEmptyUpload(t *testing.T) { Body: bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":0,"SizeIsDeferred":false,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"uploadId","Type":"s3store"}}`)), ContentLength: aws.Int64(int64(208)), }), - s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - PartNumberMarker: aws.Int64(0), - }).Return(&s3.ListPartsOutput{ - Parts: []*s3.Part{}, - }, nil), s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ Bucket: aws.String("bucket"), Key: aws.String("uploadId"), @@ -272,6 +267,17 @@ func TestGetInfoNotFound(t *testing.T) { Key: aws.String("uploadId.info"), }).Return(nil, awserr.New("NoSuchKey", "The specified key does not exist.", nil)) + s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), + }).Return(nil, awserr.New("NoSuchUpload", "Not found", nil)) + s3obj.EXPECT().HeadObjectWithContext(context.Background(), &s3.HeadObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(nil, awserr.New("NoSuchKey", "Not found", nil)) + upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") assert.Nil(err) @@ -287,47 +293,52 @@ func TestGetInfo(t *testing.T) { s3obj := NewMockS3API(mockCtrl) store := New("bucket", s3obj) - gomock.InOrder( - s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId.info"), - }).Return(&s3.GetObjectOutput{ - Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":500,"Offset":0,"MetaData":{"bar":"menü","foo":"hello"},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"my/uploaded/files/uploadId","Type":"s3store"}}`))), - }, nil), - s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - PartNumberMarker: aws.Int64(0), - }).Return(&s3.ListPartsOutput{ - Parts: []*s3.Part{ - { - Size: aws.Int64(100), - }, - { - Size: aws.Int64(200), - }, + s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.info"), + }).Return(&s3.GetObjectOutput{ + Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":500,"Offset":0,"MetaData":{"bar":"menü","foo":"hello"},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"my/uploaded/files/uploadId","Type":"s3store"}}`))), + }, nil) + s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), + }).Return(&s3.ListPartsOutput{ + Parts: []*s3.Part{ + { + PartNumber: aws.Int64(1), + Size: aws.Int64(100), + ETag: aws.String("etag-1"), }, - NextPartNumberMarker: aws.Int64(2), - IsTruncated: aws.Bool(true), - }, nil), - s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - PartNumberMarker: aws.Int64(2), - }).Return(&s3.ListPartsOutput{ - Parts: []*s3.Part{ - { - Size: aws.Int64(100), - }, + { + PartNumber: aws.Int64(2), + Size: aws.Int64(200), + ETag: aws.String("etag-2"), }, - }, nil), - s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId.part"), - }).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "Not found", nil)), - ) + }, + NextPartNumberMarker: aws.Int64(2), + // Simulate a truncated response, so s3store should send a second request + IsTruncated: aws.Bool(true), + }, nil) + s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(2), + }).Return(&s3.ListPartsOutput{ + Parts: []*s3.Part{ + { + PartNumber: aws.Int64(3), + Size: aws.Int64(100), + ETag: aws.String("etag-3"), + }, + }, + }, nil) + s3obj.EXPECT().HeadObjectWithContext(context.Background(), &s3.HeadObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(nil, awserr.New("NoSuchKey", "Not found", nil)) upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") assert.Nil(err) @@ -353,47 +364,52 @@ func TestGetInfoWithMetadataObjectPrefix(t *testing.T) { store := New("bucket", s3obj) store.MetadataObjectPrefix = "my/metadata" - gomock.InOrder( - s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("my/metadata/uploadId.info"), - }).Return(&s3.GetObjectOutput{ - Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":500,"Offset":0,"MetaData":{"bar":"menü","foo":"hello"},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"my/uploaded/files/uploadId","Type":"s3store"}}`))), - }, nil), - s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - PartNumberMarker: aws.Int64(0), - }).Return(&s3.ListPartsOutput{ - Parts: []*s3.Part{ - { - Size: aws.Int64(100), - }, - { - Size: aws.Int64(200), - }, + s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("my/metadata/uploadId.info"), + }).Return(&s3.GetObjectOutput{ + Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":500,"Offset":0,"MetaData":{"bar":"menü","foo":"hello"},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"my/uploaded/files/uploadId","Type":"s3store"}}`))), + }, nil) + s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), + }).Return(&s3.ListPartsOutput{ + Parts: []*s3.Part{ + { + PartNumber: aws.Int64(1), + Size: aws.Int64(100), + ETag: aws.String("etag-1"), }, - NextPartNumberMarker: aws.Int64(2), - IsTruncated: aws.Bool(true), - }, nil), - s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - PartNumberMarker: aws.Int64(2), - }).Return(&s3.ListPartsOutput{ - Parts: []*s3.Part{ - { - Size: aws.Int64(100), - }, + { + PartNumber: aws.Int64(2), + Size: aws.Int64(200), + ETag: aws.String("etag-2"), }, - }, nil), - s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("my/metadata/uploadId.part"), - }).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "Not found", nil)), - ) + }, + NextPartNumberMarker: aws.Int64(2), + // Simulate a truncated response, so s3store should send a second request + IsTruncated: aws.Bool(true), + }, nil) + s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(2), + }).Return(&s3.ListPartsOutput{ + Parts: []*s3.Part{ + { + PartNumber: aws.Int64(3), + Size: aws.Int64(100), + ETag: aws.String("etag-3"), + }, + }, + }, nil) + s3obj.EXPECT().HeadObjectWithContext(context.Background(), &s3.HeadObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("my/metadata/uploadId.part"), + }).Return(nil, awserr.New("NoSuchKey", "Not found", nil)) upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") assert.Nil(err) @@ -418,27 +434,24 @@ func TestGetInfoWithIncompletePart(t *testing.T) { s3obj := NewMockS3API(mockCtrl) store := New("bucket", s3obj) - gomock.InOrder( - s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId.info"), - }).Return(&s3.GetObjectOutput{ - Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":500,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))), - }, nil), - s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - PartNumberMarker: aws.Int64(0), - }).Return(&s3.ListPartsOutput{Parts: []*s3.Part{}}, nil), - s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId.part"), - }).Return(&s3.GetObjectOutput{ - ContentLength: aws.Int64(10), - Body: ioutil.NopCloser(bytes.NewReader([]byte("0123456789"))), - }, nil), - ) + s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.info"), + }).Return(&s3.GetObjectOutput{ + Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":500,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))), + }, nil) + s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), + }).Return(&s3.ListPartsOutput{Parts: []*s3.Part{}}, nil) + s3obj.EXPECT().HeadObjectWithContext(context.Background(), &s3.HeadObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.HeadObjectOutput{ + ContentLength: aws.Int64(10), + }, nil) upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") assert.Nil(err) @@ -457,20 +470,22 @@ func TestGetInfoFinished(t *testing.T) { s3obj := NewMockS3API(mockCtrl) store := New("bucket", s3obj) - gomock.InOrder( - s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId.info"), - }).Return(&s3.GetObjectOutput{ - Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))), - }, nil), - s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - PartNumberMarker: aws.Int64(0), - }).Return(nil, awserr.New("NoSuchUpload", "The specified upload does not exist.", nil)), - ) + s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.info"), + }).Return(&s3.GetObjectOutput{ + Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))), + }, nil) + s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), + }).Return(nil, awserr.New("NoSuchUpload", "The specified upload does not exist.", nil)) + s3obj.EXPECT().HeadObjectWithContext(context.Background(), &s3.HeadObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(nil, awserr.New("NoSuchKey", "Not found", nil)) upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") assert.Nil(err) @@ -572,32 +587,30 @@ func TestDeclareLength(t *testing.T) { s3obj := NewMockS3API(mockCtrl) store := New("bucket", s3obj) - gomock.InOrder( - s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId.info"), - }).Return(&s3.GetObjectOutput{ - Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":0,"SizeIsDeferred":true,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"uploadId","Type":"s3store"}}`))), - }, nil), - s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - PartNumberMarker: aws.Int64(0), - }).Return(&s3.ListPartsOutput{ - Parts: []*s3.Part{}, - }, nil), - s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId.part"), - }).Return(nil, awserr.New("NotFound", "Not Found", nil)), - s3obj.EXPECT().PutObjectWithContext(context.Background(), &s3.PutObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId.info"), - Body: bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":500,"SizeIsDeferred":false,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"uploadId","Type":"s3store"}}`)), - ContentLength: aws.Int64(int64(208)), - }), - ) + s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.info"), + }).Return(&s3.GetObjectOutput{ + Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":0,"SizeIsDeferred":true,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"uploadId","Type":"s3store"}}`))), + }, nil) + s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), + }).Return(&s3.ListPartsOutput{ + Parts: []*s3.Part{}, + }, nil) + s3obj.EXPECT().HeadObjectWithContext(context.Background(), &s3.HeadObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(nil, awserr.New("NotFound", "Not Found", nil)) + s3obj.EXPECT().PutObjectWithContext(context.Background(), &s3.PutObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.info"), + Body: bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":500,"SizeIsDeferred":false,"Offset":0,"MetaData":{},"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":{"Bucket":"bucket","Key":"uploadId","Type":"s3store"}}`)), + ContentLength: aws.Int64(int64(208)), + }) upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") assert.Nil(err) @@ -617,64 +630,72 @@ func TestFinishUpload(t *testing.T) { s3obj := NewMockS3API(mockCtrl) store := New("bucket", s3obj) - gomock.InOrder( - s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - PartNumberMarker: aws.Int64(0), - }).Return(&s3.ListPartsOutput{ - Parts: []*s3.Part{ + s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.info"), + }).Return(&s3.GetObjectOutput{ + Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":400,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))), + }, nil) + s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), + }).Return(&s3.ListPartsOutput{ + Parts: []*s3.Part{ + { + Size: aws.Int64(100), + ETag: aws.String("etag-1"), + PartNumber: aws.Int64(1), + }, + { + Size: aws.Int64(200), + ETag: aws.String("etag-2"), + PartNumber: aws.Int64(2), + }, + }, + NextPartNumberMarker: aws.Int64(2), + IsTruncated: aws.Bool(true), + }, nil) + s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(2), + }).Return(&s3.ListPartsOutput{ + Parts: []*s3.Part{ + { + Size: aws.Int64(100), + ETag: aws.String("etag-3"), + PartNumber: aws.Int64(3), + }, + }, + }, nil) + s3obj.EXPECT().HeadObjectWithContext(context.Background(), &s3.HeadObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(nil, awserr.New("NotFound", "Not Found", nil)) + s3obj.EXPECT().CompleteMultipartUploadWithContext(context.Background(), &s3.CompleteMultipartUploadInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + MultipartUpload: &s3.CompletedMultipartUpload{ + Parts: []*s3.CompletedPart{ { - Size: aws.Int64(100), - ETag: aws.String("foo"), + ETag: aws.String("etag-1"), PartNumber: aws.Int64(1), }, { - Size: aws.Int64(200), - ETag: aws.String("bar"), + ETag: aws.String("etag-2"), PartNumber: aws.Int64(2), }, - }, - NextPartNumberMarker: aws.Int64(2), - IsTruncated: aws.Bool(true), - }, nil), - s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - PartNumberMarker: aws.Int64(2), - }).Return(&s3.ListPartsOutput{ - Parts: []*s3.Part{ { - Size: aws.Int64(100), - ETag: aws.String("foobar"), + ETag: aws.String("etag-3"), PartNumber: aws.Int64(3), }, }, - }, nil), - s3obj.EXPECT().CompleteMultipartUploadWithContext(context.Background(), &s3.CompleteMultipartUploadInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - MultipartUpload: &s3.CompletedMultipartUpload{ - Parts: []*s3.CompletedPart{ - { - ETag: aws.String("foo"), - PartNumber: aws.Int64(1), - }, - { - ETag: aws.String("bar"), - PartNumber: aws.Int64(2), - }, - { - ETag: aws.String("foobar"), - PartNumber: aws.Int64(3), - }, - }, - }, - }).Return(nil, nil), - ) + }, + }).Return(nil, nil) upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") assert.Nil(err) @@ -696,6 +717,7 @@ func TestWriteChunk(t *testing.T) { store.MaxMultipartParts = 10000 store.MaxObjectSize = 5 * 1024 * 1024 * 1024 * 1024 + // From GetInfo s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ Bucket: aws.String("bucket"), Key: aws.String("uploadId.info"), @@ -710,50 +732,55 @@ func TestWriteChunk(t *testing.T) { }).Return(&s3.ListPartsOutput{ Parts: []*s3.Part{ { - Size: aws.Int64(100), + Size: aws.Int64(100), + ETag: aws.String("etag-1"), + PartNumber: aws.Int64(1), }, { - Size: aws.Int64(200), + Size: aws.Int64(200), + ETag: aws.String("etag-2"), + PartNumber: aws.Int64(2), }, }, - }, nil).Times(2) - s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ + }, nil) + s3obj.EXPECT().HeadObjectWithContext(context.Background(), &s3.HeadObjectInput{ Bucket: aws.String("bucket"), Key: aws.String("uploadId.part"), - }).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "Not found", nil)) - s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId.part"), - }).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist.", nil)) + }).Return(nil, awserr.New("NoSuchKey", "Not found", nil)) - gomock.InOrder( - s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - PartNumber: aws.Int64(3), - Body: bytes.NewReader([]byte("1234")), - })).Return(nil, nil), - s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - PartNumber: aws.Int64(4), - Body: bytes.NewReader([]byte("5678")), - })).Return(nil, nil), - s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - PartNumber: aws.Int64(5), - Body: bytes.NewReader([]byte("90AB")), - })).Return(nil, nil), - s3obj.EXPECT().PutObjectWithContext(context.Background(), NewPutObjectInputMatcher(&s3.PutObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId.part"), - Body: bytes.NewReader([]byte("CD")), - })).Return(nil, nil), - ) + // From WriteChunk + s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumber: aws.Int64(3), + Body: bytes.NewReader([]byte("1234")), + })).Return(&s3.UploadPartOutput{ + ETag: aws.String("etag-3"), + }, nil) + s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumber: aws.Int64(4), + Body: bytes.NewReader([]byte("5678")), + })).Return(&s3.UploadPartOutput{ + ETag: aws.String("etag-4"), + }, nil) + s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumber: aws.Int64(5), + Body: bytes.NewReader([]byte("90AB")), + })).Return(&s3.UploadPartOutput{ + ETag: aws.String("etag-5"), + }, nil) + s3obj.EXPECT().PutObjectWithContext(context.Background(), NewPutObjectInputMatcher(&s3.PutObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + Body: bytes.NewReader([]byte("CD")), + })).Return(nil, nil) upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") assert.Nil(err) @@ -785,29 +812,27 @@ func TestWriteChunkWriteIncompletePartBecauseTooSmall(t *testing.T) { }).Return(&s3.ListPartsOutput{ Parts: []*s3.Part{ { - Size: aws.Int64(100), + Size: aws.Int64(100), + ETag: aws.String("etag-1"), + PartNumber: aws.Int64(1), }, { - Size: aws.Int64(200), + Size: aws.Int64(200), + ETag: aws.String("etag-2"), + PartNumber: aws.Int64(2), }, }, - }, nil).Times(2) - s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ + }, nil) + s3obj.EXPECT().HeadObjectWithContext(context.Background(), &s3.HeadObjectInput{ Bucket: aws.String("bucket"), Key: aws.String("uploadId.part"), - }).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist", nil)) + }).Return(nil, awserr.New("NoSuchKey", "The specified key does not exist", nil)) - gomock.InOrder( - s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId.part"), - }).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist.", nil)), - s3obj.EXPECT().PutObjectWithContext(context.Background(), NewPutObjectInputMatcher(&s3.PutObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId.part"), - Body: bytes.NewReader([]byte("1234567890")), - })).Return(nil, nil), - ) + s3obj.EXPECT().PutObjectWithContext(context.Background(), NewPutObjectInputMatcher(&s3.PutObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + Body: bytes.NewReader([]byte("1234567890")), + })).Return(nil, nil) upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") assert.Nil(err) @@ -836,12 +861,19 @@ func TestWriteChunkPrependsIncompletePart(t *testing.T) { }).Return(&s3.GetObjectOutput{ Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":5,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))), }, nil) - s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ + s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumberMarker: aws.Int64(0), + }).Return(&s3.ListPartsOutput{ + Parts: []*s3.Part{}, + }, nil) + s3obj.EXPECT().HeadObjectWithContext(context.Background(), &s3.HeadObjectInput{ Bucket: aws.String("bucket"), Key: aws.String("uploadId.part"), - }).Return(&s3.GetObjectOutput{ + }).Return(&s3.HeadObjectOutput{ ContentLength: aws.Int64(3), - Body: ioutil.NopCloser(bytes.NewReader([]byte("123"))), }, nil) s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ Bucket: aws.String("bucket"), @@ -854,29 +886,25 @@ func TestWriteChunkPrependsIncompletePart(t *testing.T) { Bucket: aws.String(store.Bucket), Key: aws.String("uploadId.part"), }).Return(&s3.DeleteObjectOutput{}, nil) - s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - PartNumberMarker: aws.Int64(0), - }).Return(&s3.ListPartsOutput{Parts: []*s3.Part{}}, nil).Times(2) - gomock.InOrder( - s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - PartNumber: aws.Int64(1), - Body: bytes.NewReader([]byte("1234")), - })).Return(nil, nil), - s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - PartNumber: aws.Int64(2), - Body: bytes.NewReader([]byte("5")), - })).Return(nil, nil), - ) + s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumber: aws.Int64(1), + Body: bytes.NewReader([]byte("1234")), + })).Return(&s3.UploadPartOutput{ + ETag: aws.String("etag-1"), + }, nil) + s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumber: aws.Int64(2), + Body: bytes.NewReader([]byte("5")), + })).Return(&s3.UploadPartOutput{ + ETag: aws.String("etag-2"), + }, nil) upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") assert.Nil(err) @@ -910,33 +938,39 @@ func TestWriteChunkPrependsIncompletePartAndWritesANewIncompletePart(t *testing. Key: aws.String("uploadId"), UploadId: aws.String("multipartId"), PartNumberMarker: aws.Int64(0), - }).Return(&s3.ListPartsOutput{Parts: []*s3.Part{}}, nil).Times(2) + }).Return(&s3.ListPartsOutput{Parts: []*s3.Part{}}, nil) + s3obj.EXPECT().HeadObjectWithContext(context.Background(), &s3.HeadObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + }).Return(&s3.HeadObjectOutput{ + ContentLength: aws.Int64(3), + }, nil) s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ Bucket: aws.String("bucket"), Key: aws.String("uploadId.part"), }).Return(&s3.GetObjectOutput{ ContentLength: aws.Int64(3), Body: ioutil.NopCloser(bytes.NewReader([]byte("123"))), - }, nil).Times(2) + }, nil) s3obj.EXPECT().DeleteObjectWithContext(context.Background(), &s3.DeleteObjectInput{ Bucket: aws.String(store.Bucket), Key: aws.String("uploadId.part"), }).Return(&s3.DeleteObjectOutput{}, nil) - gomock.InOrder( - s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - PartNumber: aws.Int64(1), - Body: bytes.NewReader([]byte("1234")), - })).Return(nil, nil), - s3obj.EXPECT().PutObjectWithContext(context.Background(), NewPutObjectInputMatcher(&s3.PutObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId.part"), - Body: bytes.NewReader([]byte("5")), - })).Return(nil, nil), - ) + s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumber: aws.Int64(1), + Body: bytes.NewReader([]byte("1234")), + })).Return(&s3.UploadPartOutput{ + ETag: aws.String("etag-1"), + }, nil) + s3obj.EXPECT().PutObjectWithContext(context.Background(), NewPutObjectInputMatcher(&s3.PutObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + Body: bytes.NewReader([]byte("5")), + })).Return(nil, nil) upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") assert.Nil(err) @@ -969,28 +1003,30 @@ func TestWriteChunkAllowTooSmallLast(t *testing.T) { }).Return(&s3.ListPartsOutput{ Parts: []*s3.Part{ { - Size: aws.Int64(400), + PartNumber: aws.Int64(1), + Size: aws.Int64(400), + ETag: aws.String("etag-1"), }, { - Size: aws.Int64(90), + PartNumber: aws.Int64(2), + Size: aws.Int64(90), + ETag: aws.String("etag-2"), }, }, - }, nil).Times(2) - s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ + }, nil) + s3obj.EXPECT().HeadObjectWithContext(context.Background(), &s3.HeadObjectInput{ Bucket: aws.String("bucket"), Key: aws.String("uploadId.part"), - }).Return(&s3.GetObjectOutput{}, awserr.New("AccessDenied", "Access Denied.", nil)) - s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId.part"), - }).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist.", nil)) + }).Return(nil, awserr.New("AccessDenied", "Access Denied.", nil)) s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ Bucket: aws.String("bucket"), Key: aws.String("uploadId"), UploadId: aws.String("multipartId"), PartNumber: aws.Int64(3), Body: bytes.NewReader([]byte("1234567890")), - })).Return(nil, nil) + })).Return(&s3.UploadPartOutput{ + ETag: aws.String("etag-3"), + }, nil) upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") assert.Nil(err) @@ -1101,13 +1137,33 @@ func TestConcatUploadsUsingMultipart(t *testing.T) { store := New("bucket", s3obj) store.MinPartSize = 100 + // Calls from NewUpload + s3obj.EXPECT().CreateMultipartUploadWithContext(context.Background(), &s3.CreateMultipartUploadInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + Metadata: map[string]*string{}, + }).Return(&s3.CreateMultipartUploadOutput{ + UploadId: aws.String("multipartId"), + }, nil) + s3obj.EXPECT().PutObjectWithContext(context.Background(), &s3.PutObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.info"), + Body: bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":0,"SizeIsDeferred":false,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":true,"PartialUploads":["aaa+AAA","bbb+BBB","ccc+CCC"],"Storage":{"Bucket":"bucket","Key":"uploadId","Type":"s3store"}}`)), + ContentLength: aws.Int64(int64(234)), + }) + + // Calls from ConcatUploads s3obj.EXPECT().UploadPartCopyWithContext(context.Background(), &s3.UploadPartCopyInput{ Bucket: aws.String("bucket"), Key: aws.String("uploadId"), UploadId: aws.String("multipartId"), CopySource: aws.String("bucket/aaa"), PartNumber: aws.Int64(1), - }).Return(nil, nil) + }).Return(&s3.UploadPartCopyOutput{ + CopyPartResult: &s3.CopyPartResult{ + ETag: aws.String("etag-1"), + }, + }, nil) s3obj.EXPECT().UploadPartCopyWithContext(context.Background(), &s3.UploadPartCopyInput{ Bucket: aws.String("bucket"), @@ -1115,7 +1171,11 @@ func TestConcatUploadsUsingMultipart(t *testing.T) { UploadId: aws.String("multipartId"), CopySource: aws.String("bucket/bbb"), PartNumber: aws.Int64(2), - }).Return(nil, nil) + }).Return(&s3.UploadPartCopyOutput{ + CopyPartResult: &s3.CopyPartResult{ + ETag: aws.String("etag-2"), + }, + }, nil) s3obj.EXPECT().UploadPartCopyWithContext(context.Background(), &s3.UploadPartCopyInput{ Bucket: aws.String("bucket"), @@ -1123,55 +1183,45 @@ func TestConcatUploadsUsingMultipart(t *testing.T) { UploadId: aws.String("multipartId"), CopySource: aws.String("bucket/ccc"), PartNumber: aws.Int64(3), - }).Return(nil, nil) + }).Return(&s3.UploadPartCopyOutput{ + CopyPartResult: &s3.CopyPartResult{ + ETag: aws.String("etag-3"), + }, + }, nil) - // Output from s3Store.FinishUpload - gomock.InOrder( - s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - PartNumberMarker: aws.Int64(0), - }).Return(&s3.ListPartsOutput{ - Parts: []*s3.Part{ + // Calls from FinishUpload + s3obj.EXPECT().CompleteMultipartUploadWithContext(context.Background(), &s3.CompleteMultipartUploadInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + MultipartUpload: &s3.CompletedMultipartUpload{ + Parts: []*s3.CompletedPart{ { - ETag: aws.String("foo"), + ETag: aws.String("etag-1"), PartNumber: aws.Int64(1), }, { - ETag: aws.String("bar"), + ETag: aws.String("etag-2"), PartNumber: aws.Int64(2), }, { - ETag: aws.String("baz"), + ETag: aws.String("etag-3"), PartNumber: aws.Int64(3), }, }, - }, nil), - s3obj.EXPECT().CompleteMultipartUploadWithContext(context.Background(), &s3.CompleteMultipartUploadInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - MultipartUpload: &s3.CompletedMultipartUpload{ - Parts: []*s3.CompletedPart{ - { - ETag: aws.String("foo"), - PartNumber: aws.Int64(1), - }, - { - ETag: aws.String("bar"), - PartNumber: aws.Int64(2), - }, - { - ETag: aws.String("baz"), - PartNumber: aws.Int64(3), - }, - }, - }, - }).Return(nil, nil), - ) + }, + }).Return(nil, nil) - upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") + info := handler.FileInfo{ + ID: "uploadId", + IsFinal: true, + PartialUploads: []string{ + "aaa+AAA", + "bbb+BBB", + "ccc+CCC", + }, + } + upload, err := store.NewUpload(context.Background(), info) assert.Nil(err) uploadA, err := store.GetUpload(context.Background(), "aaa+AAA") @@ -1269,13 +1319,13 @@ type s3APIWithTempFileAssertion struct { func (s s3APIWithTempFileAssertion) UploadPartWithContext(context.Context, *s3.UploadPartInput, ...request.Option) (*s3.UploadPartOutput, error) { assert := s.assert - // Make sure that only the two temporary files from tusd are in here. + // Make sure that there are temporary files from tusd in here. files, err := ioutil.ReadDir(s.tempDir) assert.Nil(err) for _, file := range files { assert.True(strings.HasPrefix(file.Name(), "tusd-s3-tmp-")) } - assert.Equal(len(files), 2) + assert.True(len(files) > 0) return nil, fmt.Errorf("not now") } @@ -1316,7 +1366,7 @@ func TestWriteChunkCleansUpTempFiles(t *testing.T) { Bucket: aws.String("bucket"), Key: aws.String("uploadId.info"), }).Return(&s3.GetObjectOutput{ - Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":500,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))), + Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"ID":"uploadId","Size":14,"Offset":0,"MetaData":null,"IsPartial":false,"IsFinal":false,"PartialUploads":null,"Storage":null}`))), }, nil) s3obj.EXPECT().ListPartsWithContext(context.Background(), &s3.ListPartsInput{ Bucket: aws.String("bucket"), @@ -1324,30 +1374,19 @@ func TestWriteChunkCleansUpTempFiles(t *testing.T) { UploadId: aws.String("multipartId"), PartNumberMarker: aws.Int64(0), }).Return(&s3.ListPartsOutput{ - Parts: []*s3.Part{ - { - Size: aws.Int64(100), - }, - { - Size: aws.Int64(200), - }, - }, - }, nil).Times(2) - s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ + Parts: []*s3.Part{}, + }, nil) + s3obj.EXPECT().HeadObjectWithContext(context.Background(), &s3.HeadObjectInput{ Bucket: aws.String("bucket"), Key: aws.String("uploadId.part"), - }).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "Not found", nil)) - s3obj.EXPECT().GetObjectWithContext(context.Background(), &s3.GetObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId.part"), - }).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist.", nil)) + }).Return(nil, awserr.New("NoSuchKey", "Not found", nil)) // No calls to s3obj.EXPECT().UploadPartWithContext since that is handled by s3APIWithTempFileAssertion upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") assert.Nil(err) - bytesRead, err := upload.WriteChunk(context.Background(), 300, bytes.NewReader([]byte("1234567890ABCD"))) + bytesRead, err := upload.WriteChunk(context.Background(), 0, bytes.NewReader([]byte("1234567890ABCD"))) assert.NotNil(err) assert.Equal(err.Error(), "not now") assert.Equal(int64(0), bytesRead)