From c0f2026e96db2a66f741aeffd609762594c93961 Mon Sep 17 00:00:00 2001 From: Marius Date: Sun, 10 Apr 2022 21:13:07 +0200 Subject: [PATCH] s3store: Implement temporary support for buffering in memory --- pkg/s3store/s3store.go | 21 ++-- pkg/s3store/s3store_part_producer.go | 113 +++++++++++++++------- pkg/s3store/s3store_part_producer_test.go | 6 +- 3 files changed, 90 insertions(+), 50 deletions(-) diff --git a/pkg/s3store/s3store.go b/pkg/s3store/s3store.go index e7f3d00..9ddb517 100644 --- a/pkg/s3store/s3store.go +++ b/pkg/s3store/s3store.go @@ -461,8 +461,9 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re break } - partfile := fileChunk.file + partfile := fileChunk.reader partsize := fileChunk.size + closePart := fileChunk.closeReader isFinalChunk := !info.SizeIsDeferred && (size == offset+bytesUploaded+partsize) if partsize >= store.MinPartSize || isFinalChunk { @@ -474,9 +475,10 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re upload.parts = append(upload.parts, part) wg.Add(1) - go func(file *os.File, part *s3Part) { + go func(file io.ReadSeeker, part *s3Part, closePart func()) { defer upload.store.uploadSemaphore.Release() defer wg.Done() + defer closePart() t := time.Now() uploadPartInput := &s3.UploadPartInput{ @@ -492,18 +494,19 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re } else { part.etag = etag } - }(partfile, part) + }(partfile, part, closePart) } else { wg.Add(1) - go func(file *os.File) { + go func(file io.ReadSeeker, closePart func()) { defer upload.store.uploadSemaphore.Release() defer wg.Done() + defer closePart() if err := store.putIncompletePartForUpload(ctx, uploadId, file); err != nil { uploadErr = err } upload.incompletePartSize = partsize - }(partfile) + }(partfile, closePart) } bytesUploaded += partsize @@ -524,9 +527,7 @@ func cleanUpTempFile(file *os.File) { os.Remove(file.Name()) } -func (upload *s3Upload) putPartForUpload(ctx context.Context, uploadPartInput *s3.UploadPartInput, file *os.File, size int64) (string, error) { - defer cleanUpTempFile(file) - +func (upload *s3Upload) putPartForUpload(ctx context.Context, uploadPartInput *s3.UploadPartInput, file io.ReadSeeker, size int64) (string, error) { if !upload.store.DisableContentHashes { // By default, use the traditional approach to upload data uploadPartInput.Body = file @@ -1101,9 +1102,7 @@ func (store S3Store) headIncompletePartForUpload(ctx context.Context, uploadId s return *obj.ContentLength, nil } -func (store S3Store) putIncompletePartForUpload(ctx context.Context, uploadId string, file *os.File) error { - defer cleanUpTempFile(file) - +func (store S3Store) putIncompletePartForUpload(ctx context.Context, uploadId string, file io.ReadSeeker) error { t := time.Now() _, err := store.Service.PutObjectWithContext(ctx, &s3.PutObjectInput{ Bucket: aws.String(store.Bucket), diff --git a/pkg/s3store/s3store_part_producer.go b/pkg/s3store/s3store_part_producer.go index 0f7c9dd..07a80bd 100644 --- a/pkg/s3store/s3store_part_producer.go +++ b/pkg/s3store/s3store_part_producer.go @@ -1,6 +1,7 @@ package s3store import ( + "bytes" "io" "io/ioutil" "os" @@ -9,6 +10,8 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +const TEMP_DIR_USE_MEMORY = "_memory" + // s3PartProducer converts a stream of bytes from the reader into a stream of files on disk type s3PartProducer struct { tmpDir string @@ -20,14 +23,19 @@ type s3PartProducer struct { } type fileChunk struct { - file *os.File - size int64 + reader io.ReadSeeker + closeReader func() + size int64 } func newS3PartProducer(source io.Reader, backlog int64, tmpDir string, diskWriteDurationMetric prometheus.Summary) (s3PartProducer, <-chan fileChunk) { fileChan := make(chan fileChunk, backlog) doneChan := make(chan struct{}) + if os.Getenv("TUSD_S3STORE_TEMP_MEMORY") == "1" { + tmpDir = TEMP_DIR_USE_MEMORY + } + partProducer := s3PartProducer{ tmpDir: tmpDir, done: doneChan, @@ -47,7 +55,7 @@ func (spp *s3PartProducer) stop() { // If we return while there are still files in the channel, then // we may leak file descriptors. Let's ensure that those are cleaned up. for fileChunk := range spp.files { - cleanUpTempFile(fileChunk.file) + fileChunk.closeReader() } } @@ -76,37 +84,72 @@ outerloop: } func (spp *s3PartProducer) nextPart(size int64) (fileChunk, bool, error) { - // Create a temporary file to store the part - file, err := ioutil.TempFile(spp.tmpDir, "tusd-s3-tmp-") - if err != nil { - return fileChunk{}, false, err + if spp.tmpDir != TEMP_DIR_USE_MEMORY { + // Create a temporary file to store the part + file, err := ioutil.TempFile(spp.tmpDir, "tusd-s3-tmp-") + if err != nil { + return fileChunk{}, false, err + } + + limitedReader := io.LimitReader(spp.r, size) + start := time.Now() + + n, err := io.Copy(file, limitedReader) + if err != nil { + return fileChunk{}, false, err + } + + // If the entire request body is read and no more data is available, + // io.Copy returns 0 since it is unable to read any bytes. In that + // case, we can close the s3PartProducer. + if n == 0 { + cleanUpTempFile(file) + return fileChunk{}, false, nil + } + + elapsed := time.Now().Sub(start) + ms := float64(elapsed.Nanoseconds() / int64(time.Millisecond)) + spp.diskWriteDurationMetric.Observe(ms) + + // Seek to the beginning of the file + file.Seek(0, 0) + + return fileChunk{ + reader: file, + closeReader: func() { + file.Close() + os.Remove(file.Name()) + }, + size: n, + }, true, nil + } else { + // Create a temporary buffer to store the part + buf := new(bytes.Buffer) + + limitedReader := io.LimitReader(spp.r, size) + start := time.Now() + + n, err := io.Copy(buf, limitedReader) + if err != nil { + return fileChunk{}, false, err + } + + // If the entire request body is read and no more data is available, + // io.Copy returns 0 since it is unable to read any bytes. In that + // case, we can close the s3PartProducer. + if n == 0 { + return fileChunk{}, false, nil + } + + elapsed := time.Now().Sub(start) + ms := float64(elapsed.Nanoseconds() / int64(time.Millisecond)) + spp.diskWriteDurationMetric.Observe(ms) + + return fileChunk{ + // buf does not get written to anymore, so we can turn it into a reader + reader: bytes.NewReader(buf.Bytes()), + closeReader: func() {}, + size: n, + }, true, nil } - - limitedReader := io.LimitReader(spp.r, size) - start := time.Now() - - n, err := io.Copy(file, limitedReader) - if err != nil { - return fileChunk{}, false, err - } - - // If the entire request body is read and no more data is available, - // io.Copy returns 0 since it is unable to read any bytes. In that - // case, we can close the s3PartProducer. - if n == 0 { - cleanUpTempFile(file) - return fileChunk{}, false, nil - } - - elapsed := time.Now().Sub(start) - ms := float64(elapsed.Nanoseconds() / int64(time.Millisecond)) - spp.diskWriteDurationMetric.Observe(ms) - - // Seek to the beginning of the file - file.Seek(0, 0) - - return fileChunk{ - file: file, - size: n, - }, true, nil } diff --git a/pkg/s3store/s3store_part_producer_test.go b/pkg/s3store/s3store_part_producer_test.go index f4a25cb..7a13643 100644 --- a/pkg/s3store/s3store_part_producer_test.go +++ b/pkg/s3store/s3store_part_producer_test.go @@ -2,7 +2,6 @@ package s3store import ( "errors" - "os" "strings" "testing" "time" @@ -34,7 +33,7 @@ func TestPartProducerConsumesEntireReaderWithoutError(t *testing.T) { actualStr := "" b := make([]byte, 1) for chunk := range fileChan { - n, err := chunk.file.Read(b) + n, err := chunk.reader.Read(b) if err != nil { t.Fatalf("unexpected error: %s", err) } @@ -46,8 +45,7 @@ func TestPartProducerConsumesEntireReaderWithoutError(t *testing.T) { } actualStr += string(b) - os.Remove(chunk.file.Name()) - chunk.file.Close() + chunk.closeReader() } if actualStr != expectedStr {