Integrate s3ChunkProducer to support chunk buffering

This commit is contained in:
Adam Jensen 2020-03-16 10:12:40 -04:00
parent 2b3fb49096
commit 54819d8a31
1 changed files with 22 additions and 24 deletions

View File

@ -135,6 +135,11 @@ type S3Store struct {
// MaxObjectSize is the maximum size an S3 Object can have according to S3 // MaxObjectSize is the maximum size an S3 Object can have according to S3
// API specifications. See link above. // API specifications. See link above.
MaxObjectSize int64 MaxObjectSize int64
// MaxBufferedParts is the number of additional parts that can be received from
// the client and stored on disk while a part is being uploaded to S3. This
// can help improve throughput by not blocking the client while tusd is
// communicating with the S3 API, which can have unpredictable latency.
MaxBufferedParts int
} }
type S3API interface { type S3API interface {
@ -379,35 +384,26 @@ func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Read
src = io.MultiReader(incompletePartFile, src) src = io.MultiReader(incompletePartFile, src)
} }
for { fileChan := make(chan *os.File, store.MaxBufferedParts)
// Create a temporary file to store the part in it doneChan := make(chan struct{})
file, err := ioutil.TempFile("", "tusd-s3-tmp-") defer close(doneChan)
if err != nil {
return bytesUploaded, err chunkProducer := s3ChunkProducer{
} done: doneChan,
files: fileChan,
r: src,
}
go chunkProducer.produce(optimalPartSize)
for file := range fileChan {
defer os.Remove(file.Name()) defer os.Remove(file.Name())
defer file.Close() defer file.Close()
limitedReader := io.LimitReader(src, optimalPartSize) stat, err := file.Stat()
n, err := io.Copy(file, limitedReader)
// If the HTTP PATCH request gets interrupted in the middle (e.g. because
// the user wants to pause the upload), Go's net/http returns an io.ErrUnexpectedEOF.
// However, for S3Store it's not important whether the stream has ended
// on purpose or accidentally. Therefore, we ignore this error to not
// prevent the remaining chunk to be stored on S3.
if err == io.ErrUnexpectedEOF {
err = nil
}
// io.Copy does not return io.EOF, so we not have to handle it differently.
if err != nil { if err != nil {
return bytesUploaded, err return 0, err
}
// If io.Copy is finished reading, it will always return (0, nil).
if n == 0 {
return (bytesUploaded - incompletePartSize), nil
} }
n := stat.Size()
// Seek to the beginning of the file // Seek to the beginning of the file
file.Seek(0, 0) file.Seek(0, 0)
@ -438,6 +434,8 @@ func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Read
bytesUploaded += n bytesUploaded += n
nextPartNum += 1 nextPartNum += 1
} }
return bytesUploaded - incompletePartSize, chunkProducer.err
} }
func (upload *s3Upload) GetInfo(ctx context.Context) (info handler.FileInfo, err error) { func (upload *s3Upload) GetInfo(ctx context.Context) (info handler.FileInfo, err error) {