diff --git a/pkg/s3store/s3store.go b/pkg/s3store/s3store.go index 2a62236..7b59956 100644 --- a/pkg/s3store/s3store.go +++ b/pkg/s3store/s3store.go @@ -135,6 +135,11 @@ type S3Store struct { // MaxObjectSize is the maximum size an S3 Object can have according to S3 // API specifications. See link above. MaxObjectSize int64 + // MaxBufferedParts is the number of additional parts that can be received from + // the client and stored on disk while a part is being uploaded to S3. This + // can help improve throughput by not blocking the client while tusd is + // communicating with the S3 API, which can have unpredictable latency. + MaxBufferedParts int } type S3API interface { @@ -379,35 +384,26 @@ func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Read src = io.MultiReader(incompletePartFile, src) } - for { - // Create a temporary file to store the part in it - file, err := ioutil.TempFile("", "tusd-s3-tmp-") - if err != nil { - return bytesUploaded, err - } + fileChan := make(chan *os.File, store.MaxBufferedParts) + doneChan := make(chan struct{}) + defer close(doneChan) + + chunkProducer := s3ChunkProducer{ + done: doneChan, + files: fileChan, + r: src, + } + go chunkProducer.produce(optimalPartSize) + + for file := range fileChan { defer os.Remove(file.Name()) defer file.Close() - limitedReader := io.LimitReader(src, optimalPartSize) - n, err := io.Copy(file, limitedReader) - - // If the HTTP PATCH request gets interrupted in the middle (e.g. because - // the user wants to pause the upload), Go's net/http returns an io.ErrUnexpectedEOF. - // However, for S3Store it's not important whether the stream has ended - // on purpose or accidentally. Therefore, we ignore this error to not - // prevent the remaining chunk to be stored on S3. - if err == io.ErrUnexpectedEOF { - err = nil - } - - // io.Copy does not return io.EOF, so we not have to handle it differently. + stat, err := file.Stat() if err != nil { - return bytesUploaded, err - } - // If io.Copy is finished reading, it will always return (0, nil). - if n == 0 { - return (bytesUploaded - incompletePartSize), nil + return 0, err } + n := stat.Size() // Seek to the beginning of the file file.Seek(0, 0) @@ -438,6 +434,8 @@ func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Read bytesUploaded += n nextPartNum += 1 } + + return bytesUploaded - incompletePartSize, chunkProducer.err } func (upload *s3Upload) GetInfo(ctx context.Context) (info handler.FileInfo, err error) {