From 8c5192c25406e1c45b8dac5edc61a70abfb38265 Mon Sep 17 00:00:00 2001 From: Marius Date: Mon, 26 Apr 2021 12:18:09 +0200 Subject: [PATCH] s3store: Move parts producer into own file --- pkg/s3store/s3store.go | 57 ------------------------- pkg/s3store/s3store_part_producer.go | 64 ++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 57 deletions(-) create mode 100644 pkg/s3store/s3store_part_producer.go diff --git a/pkg/s3store/s3store.go b/pkg/s3store/s3store.go index 00d32a6..4d5e900 100644 --- a/pkg/s3store/s3store.go +++ b/pkg/s3store/s3store.go @@ -302,63 +302,6 @@ func (upload *s3Upload) writeInfo(ctx context.Context, info handler.FileInfo) er return err } -// s3PartProducer converts a stream of bytes from the reader into a stream of files on disk -type s3PartProducer struct { - store *S3Store - files chan<- *os.File - done chan struct{} - err error - r io.Reader -} - -func (spp *s3PartProducer) produce(partSize int64) { - for { - file, err := spp.nextPart(partSize) - if err != nil { - spp.err = err - close(spp.files) - return - } - if file == nil { - close(spp.files) - return - } - select { - case spp.files <- file: - case <-spp.done: - close(spp.files) - return - } - } -} - -func (spp *s3PartProducer) nextPart(size int64) (*os.File, error) { - // Create a temporary file to store the part - file, err := ioutil.TempFile(spp.store.TemporaryDirectory, "tusd-s3-tmp-") - if err != nil { - return nil, err - } - - limitedReader := io.LimitReader(spp.r, size) - n, err := io.Copy(file, limitedReader) - if err != nil { - return nil, err - } - - // If the entire request body is read and no more data is available, - // io.Copy returns 0 since it is unable to read any bytes. In that - // case, we can close the s3PartProducer. - if n == 0 { - cleanUpTempFile(file) - return nil, nil - } - - // Seek to the beginning of the file - file.Seek(0, 0) - - return file, nil -} - func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) { id := upload.id store := upload.store diff --git a/pkg/s3store/s3store_part_producer.go b/pkg/s3store/s3store_part_producer.go new file mode 100644 index 0000000..80b3f85 --- /dev/null +++ b/pkg/s3store/s3store_part_producer.go @@ -0,0 +1,64 @@ +package s3store + +import ( + "io" + "io/ioutil" + "os" +) + +// s3PartProducer converts a stream of bytes from the reader into a stream of files on disk +type s3PartProducer struct { + store *S3Store + files chan<- *os.File + done chan struct{} + err error + r io.Reader +} + +func (spp *s3PartProducer) produce(partSize int64) { + for { + file, err := spp.nextPart(partSize) + if err != nil { + spp.err = err + close(spp.files) + return + } + if file == nil { + close(spp.files) + return + } + select { + case spp.files <- file: + case <-spp.done: + close(spp.files) + return + } + } +} + +func (spp *s3PartProducer) nextPart(size int64) (*os.File, error) { + // Create a temporary file to store the part + file, err := ioutil.TempFile(spp.store.TemporaryDirectory, "tusd-s3-tmp-") + if err != nil { + return nil, err + } + + limitedReader := io.LimitReader(spp.r, size) + n, err := io.Copy(file, limitedReader) + if err != nil { + return nil, err + } + + // If the entire request body is read and no more data is available, + // io.Copy returns 0 since it is unable to read any bytes. In that + // case, we can close the s3PartProducer. + if n == 0 { + cleanUpTempFile(file) + return nil, nil + } + + // Seek to the beginning of the file + file.Seek(0, 0) + + return file, nil +}