s3store: Move parts producer into own file

This commit is contained in:
Marius 2021-04-26 12:18:09 +02:00
parent b3bf854712
commit 8c5192c254
2 changed files with 64 additions and 57 deletions

View File

@ -302,63 +302,6 @@ func (upload *s3Upload) writeInfo(ctx context.Context, info handler.FileInfo) er
return err return err
} }
// s3PartProducer converts a stream of bytes from the reader into a stream of files on disk
type s3PartProducer struct {
store *S3Store
files chan<- *os.File
done chan struct{}
err error
r io.Reader
}
func (spp *s3PartProducer) produce(partSize int64) {
for {
file, err := spp.nextPart(partSize)
if err != nil {
spp.err = err
close(spp.files)
return
}
if file == nil {
close(spp.files)
return
}
select {
case spp.files <- file:
case <-spp.done:
close(spp.files)
return
}
}
}
func (spp *s3PartProducer) nextPart(size int64) (*os.File, error) {
// Create a temporary file to store the part
file, err := ioutil.TempFile(spp.store.TemporaryDirectory, "tusd-s3-tmp-")
if err != nil {
return nil, err
}
limitedReader := io.LimitReader(spp.r, size)
n, err := io.Copy(file, limitedReader)
if err != nil {
return nil, err
}
// If the entire request body is read and no more data is available,
// io.Copy returns 0 since it is unable to read any bytes. In that
// case, we can close the s3PartProducer.
if n == 0 {
cleanUpTempFile(file)
return nil, nil
}
// Seek to the beginning of the file
file.Seek(0, 0)
return file, nil
}
func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) { func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) {
id := upload.id id := upload.id
store := upload.store store := upload.store

View File

@ -0,0 +1,64 @@
package s3store
import (
"io"
"io/ioutil"
"os"
)
// s3PartProducer converts a stream of bytes from the reader into a stream of files on disk
type s3PartProducer struct {
store *S3Store
files chan<- *os.File
done chan struct{}
err error
r io.Reader
}
func (spp *s3PartProducer) produce(partSize int64) {
for {
file, err := spp.nextPart(partSize)
if err != nil {
spp.err = err
close(spp.files)
return
}
if file == nil {
close(spp.files)
return
}
select {
case spp.files <- file:
case <-spp.done:
close(spp.files)
return
}
}
}
func (spp *s3PartProducer) nextPart(size int64) (*os.File, error) {
// Create a temporary file to store the part
file, err := ioutil.TempFile(spp.store.TemporaryDirectory, "tusd-s3-tmp-")
if err != nil {
return nil, err
}
limitedReader := io.LimitReader(spp.r, size)
n, err := io.Copy(file, limitedReader)
if err != nil {
return nil, err
}
// If the entire request body is read and no more data is available,
// io.Copy returns 0 since it is unable to read any bytes. In that
// case, we can close the s3PartProducer.
if n == 0 {
cleanUpTempFile(file)
return nil, nil
}
// Seek to the beginning of the file
file.Seek(0, 0)
return file, nil
}