diff --git a/pkg/s3store/s3store.go b/pkg/s3store/s3store.go index fe64b3c..124ee0d 100644 --- a/pkg/s3store/s3store.go +++ b/pkg/s3store/s3store.go @@ -159,6 +159,8 @@ type S3Store struct { // CPU, so it might be desirable to disable them. // Note that this property is experimental and might be removed in the future! DisableContentHashes bool + + uploadQueue *s3UploadQueue } type S3API interface { @@ -177,15 +179,16 @@ type S3API interface { // New constructs a new storage using the supplied bucket and service object. func New(bucket string, service S3API) S3Store { return S3Store{ - Bucket: bucket, - Service: service, - MaxPartSize: 5 * 1024 * 1024 * 1024, - MinPartSize: 5 * 1024 * 1024, - PreferredPartSize: 50 * 1024 * 1024, - MaxMultipartParts: 10000, - MaxObjectSize: 5 * 1024 * 1024 * 1024 * 1024, - MaxBufferedParts: 20, - TemporaryDirectory: "", + Bucket: bucket, + Service: service, + MaxPartSize: 5 * 1024 * 1024 * 1024, + MinPartSize: 5 * 1024 * 1024, + PreferredPartSize: 50 * 1024 * 1024, + MaxMultipartParts: 10000, + MaxObjectSize: 5 * 1024 * 1024 * 1024 * 1024, + MaxBufferedParts: 20, + ConcurrentPartUploads: 8, + TemporaryDirectory: "", } // TODO: Start goroutine @@ -328,6 +331,10 @@ func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Read return 0, err } + if store.uploadQueue == nil { + store.uploadQueue = newS3UploadQueue(store.Service, store.ConcurrentPartUploads, store.MaxBufferedParts, store.DisableContentHashes) + } + numParts := len(parts) nextPartNum := int64(numParts + 1) @@ -357,6 +364,27 @@ func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Read } }() + resultChannel := make(chan *s3UploadJob) + var wg sync.WaitGroup + var uploadErr error + defer close(resultChannel) + go func() { + for { + job, more := <-resultChannel + if !more { + break + } + + wg.Done() + fmt.Println("Job completed", *job.uploadPartInput.PartNumber, job.err) + if job.err != nil { + uploadErr = job.err + } else { + bytesUploaded += job.size + } + } + }() + partProducer := s3PartProducer{ store: store, done: doneChan, @@ -380,10 +408,27 @@ func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Read UploadId: aws.String(multipartId), PartNumber: aws.Int64(nextPartNum), } - if err := upload.putPartForUpload(ctx, uploadPartInput, file, n); err != nil { - return bytesUploaded, err - } + store.uploadQueue.push(&s3UploadJob{ + ctx: ctx, + uploadPartInput: uploadPartInput, + file: file, + size: n, + resultChannel: resultChannel, + }) + wg.Add(1) + //if err := upload.putPartForUpload(ctx, uploadPartInput, file, n); err != nil { + // return bytesUploaded, err + //} } else { + wg.Wait() + + if uploadErr != nil { + cleanUpTempFile(file) + return bytesUploaded, uploadErr + } + + fmt.Println("putIncompletePartForUpload", n >= store.MinPartSize, isFinalChunk) + if err := store.putIncompletePartForUpload(ctx, uploadId, file); err != nil { return bytesUploaded, err } @@ -394,10 +439,11 @@ func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Read } offset += n - bytesUploaded += n nextPartNum += 1 } + wg.Wait() + return bytesUploaded - incompletePartSize, partProducer.err } diff --git a/pkg/s3store/s3store_test.go b/pkg/s3store/s3store_test.go index 88ec09a..4cc0990 100644 --- a/pkg/s3store/s3store_test.go +++ b/pkg/s3store/s3store_test.go @@ -726,34 +726,34 @@ func TestWriteChunk(t *testing.T) { Key: aws.String("uploadId.part"), }).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "The specified key does not exist.", nil)) - gomock.InOrder( - s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - PartNumber: aws.Int64(3), - Body: bytes.NewReader([]byte("1234")), - })).Return(nil, nil), - s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - PartNumber: aws.Int64(4), - Body: bytes.NewReader([]byte("5678")), - })).Return(nil, nil), - s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - PartNumber: aws.Int64(5), - Body: bytes.NewReader([]byte("90AB")), - })).Return(nil, nil), - s3obj.EXPECT().PutObjectWithContext(context.Background(), NewPutObjectInputMatcher(&s3.PutObjectInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId.part"), - Body: bytes.NewReader([]byte("CD")), - })).Return(nil, nil), - ) + //gomock.InOrder( + s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumber: aws.Int64(3), + Body: bytes.NewReader([]byte("1234")), + })).Return(nil, nil) + s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumber: aws.Int64(4), + Body: bytes.NewReader([]byte("5678")), + })).Return(nil, nil) + s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumber: aws.Int64(5), + Body: bytes.NewReader([]byte("90AB")), + })).Return(nil, nil) + s3obj.EXPECT().PutObjectWithContext(context.Background(), NewPutObjectInputMatcher(&s3.PutObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId.part"), + Body: bytes.NewReader([]byte("CD")), + })).Return(nil, nil) + //) upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") assert.Nil(err) @@ -861,22 +861,22 @@ func TestWriteChunkPrependsIncompletePart(t *testing.T) { PartNumberMarker: aws.Int64(0), }).Return(&s3.ListPartsOutput{Parts: []*s3.Part{}}, nil).Times(2) - gomock.InOrder( - s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - PartNumber: aws.Int64(1), - Body: bytes.NewReader([]byte("1234")), - })).Return(nil, nil), - s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ - Bucket: aws.String("bucket"), - Key: aws.String("uploadId"), - UploadId: aws.String("multipartId"), - PartNumber: aws.Int64(2), - Body: bytes.NewReader([]byte("5")), - })).Return(nil, nil), - ) + //gomock.InOrder( + s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumber: aws.Int64(1), + Body: bytes.NewReader([]byte("1234")), + })).Return(nil, nil) + s3obj.EXPECT().UploadPartWithContext(context.Background(), NewUploadPartInputMatcher(&s3.UploadPartInput{ + Bucket: aws.String("bucket"), + Key: aws.String("uploadId"), + UploadId: aws.String("multipartId"), + PartNumber: aws.Int64(2), + Body: bytes.NewReader([]byte("5")), + })).Return(nil, nil) + //) upload, err := store.GetUpload(context.Background(), "uploadId+multipartId") assert.Nil(err) @@ -1267,6 +1267,7 @@ type s3APIWithTempFileAssertion struct { } func (s s3APIWithTempFileAssertion) UploadPartWithContext(context.Context, *s3.UploadPartInput, ...request.Option) (*s3.UploadPartOutput, error) { + fmt.Println("Helloz") assert := s.assert // Make sure that only the two temporary files from tusd are in here. diff --git a/pkg/s3store/s3store_upload_queue.go b/pkg/s3store/s3store_upload_queue.go index 9707e7b..275c834 100644 --- a/pkg/s3store/s3store_upload_queue.go +++ b/pkg/s3store/s3store_upload_queue.go @@ -44,7 +44,7 @@ func newS3UploadQueue(service S3API, concurrency int64, maxBufferedParts int64, disableContentHashes: disableContentHashes, } - for i := 0; i < concurrency; i++ { + for i := 0; i < int(concurrency); i++ { go s.uploadLoop() } @@ -58,6 +58,7 @@ func (s s3UploadQueue) queueLength() int { // push appends another item to the queue and returns immediately. func (s s3UploadQueue) push(job *s3UploadJob) { + // TODO: Handle closed channel s.queue <- job } @@ -75,6 +76,7 @@ func (s s3UploadQueue) uploadLoop() { } job.etag, job.err = s.putPartForUpload(job.ctx, job.uploadPartInput, job.file, job.size) + // TODO: Handle closed channel job.resultChannel <- job } } @@ -82,23 +84,24 @@ func (s s3UploadQueue) uploadLoop() { func (s s3UploadQueue) putPartForUpload(ctx context.Context, uploadPartInput *s3.UploadPartInput, file *os.File, size int64) (etag string, err error) { // TODO: Move this back into s3store where the file is created defer cleanUpTempFile(file) + fmt.Println("Job started", *uploadPartInput.PartNumber) if !s.disableContentHashes { // By default, use the traditional approach to upload data uploadPartInput.Body = file - res, err := s.service.UploadPartWithContext(ctx, uploadPartInput) - if res.ETag != nil { - etag = *res.ETag - } + _, err = s.service.UploadPartWithContext(ctx, uploadPartInput) + //if res.ETag != nil { + //etag = *res.ETag + //} return etag, err } else { // Experimental feature to prevent the AWS SDK from calculating the SHA256 hash // for the parts we upload to S3. // We compute the presigned URL without the body attached and then send the request // on our own. This way, the body is not included in the SHA256 calculation. - s3api, ok := s.service.Service.(s3APIForPresigning) + s3api, ok := s.service.(s3APIForPresigning) if !ok { - return fmt.Errorf("s3store: failed to cast S3 service for presigning") + return "", fmt.Errorf("s3store: failed to cast S3 service for presigning") } s3Req, _ := s3api.UploadPartRequest(uploadPartInput)