s3store: Implement temporary support for buffering in memory

This commit is contained in:
Marius 2022-04-10 21:13:07 +02:00
parent aace4704d7
commit c0f2026e96
3 changed files with 90 additions and 50 deletions

View File

@ -461,8 +461,9 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re
break break
} }
partfile := fileChunk.file partfile := fileChunk.reader
partsize := fileChunk.size partsize := fileChunk.size
closePart := fileChunk.closeReader
isFinalChunk := !info.SizeIsDeferred && (size == offset+bytesUploaded+partsize) isFinalChunk := !info.SizeIsDeferred && (size == offset+bytesUploaded+partsize)
if partsize >= store.MinPartSize || isFinalChunk { if partsize >= store.MinPartSize || isFinalChunk {
@ -474,9 +475,10 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re
upload.parts = append(upload.parts, part) upload.parts = append(upload.parts, part)
wg.Add(1) wg.Add(1)
go func(file *os.File, part *s3Part) { go func(file io.ReadSeeker, part *s3Part, closePart func()) {
defer upload.store.uploadSemaphore.Release() defer upload.store.uploadSemaphore.Release()
defer wg.Done() defer wg.Done()
defer closePart()
t := time.Now() t := time.Now()
uploadPartInput := &s3.UploadPartInput{ uploadPartInput := &s3.UploadPartInput{
@ -492,18 +494,19 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re
} else { } else {
part.etag = etag part.etag = etag
} }
}(partfile, part) }(partfile, part, closePart)
} else { } else {
wg.Add(1) wg.Add(1)
go func(file *os.File) { go func(file io.ReadSeeker, closePart func()) {
defer upload.store.uploadSemaphore.Release() defer upload.store.uploadSemaphore.Release()
defer wg.Done() defer wg.Done()
defer closePart()
if err := store.putIncompletePartForUpload(ctx, uploadId, file); err != nil { if err := store.putIncompletePartForUpload(ctx, uploadId, file); err != nil {
uploadErr = err uploadErr = err
} }
upload.incompletePartSize = partsize upload.incompletePartSize = partsize
}(partfile) }(partfile, closePart)
} }
bytesUploaded += partsize bytesUploaded += partsize
@ -524,9 +527,7 @@ func cleanUpTempFile(file *os.File) {
os.Remove(file.Name()) os.Remove(file.Name())
} }
func (upload *s3Upload) putPartForUpload(ctx context.Context, uploadPartInput *s3.UploadPartInput, file *os.File, size int64) (string, error) { func (upload *s3Upload) putPartForUpload(ctx context.Context, uploadPartInput *s3.UploadPartInput, file io.ReadSeeker, size int64) (string, error) {
defer cleanUpTempFile(file)
if !upload.store.DisableContentHashes { if !upload.store.DisableContentHashes {
// By default, use the traditional approach to upload data // By default, use the traditional approach to upload data
uploadPartInput.Body = file uploadPartInput.Body = file
@ -1101,9 +1102,7 @@ func (store S3Store) headIncompletePartForUpload(ctx context.Context, uploadId s
return *obj.ContentLength, nil return *obj.ContentLength, nil
} }
func (store S3Store) putIncompletePartForUpload(ctx context.Context, uploadId string, file *os.File) error { func (store S3Store) putIncompletePartForUpload(ctx context.Context, uploadId string, file io.ReadSeeker) error {
defer cleanUpTempFile(file)
t := time.Now() t := time.Now()
_, err := store.Service.PutObjectWithContext(ctx, &s3.PutObjectInput{ _, err := store.Service.PutObjectWithContext(ctx, &s3.PutObjectInput{
Bucket: aws.String(store.Bucket), Bucket: aws.String(store.Bucket),

View File

@ -1,6 +1,7 @@
package s3store package s3store
import ( import (
"bytes"
"io" "io"
"io/ioutil" "io/ioutil"
"os" "os"
@ -9,6 +10,8 @@ import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
const TEMP_DIR_USE_MEMORY = "_memory"
// s3PartProducer converts a stream of bytes from the reader into a stream of files on disk // s3PartProducer converts a stream of bytes from the reader into a stream of files on disk
type s3PartProducer struct { type s3PartProducer struct {
tmpDir string tmpDir string
@ -20,14 +23,19 @@ type s3PartProducer struct {
} }
type fileChunk struct { type fileChunk struct {
file *os.File reader io.ReadSeeker
size int64 closeReader func()
size int64
} }
func newS3PartProducer(source io.Reader, backlog int64, tmpDir string, diskWriteDurationMetric prometheus.Summary) (s3PartProducer, <-chan fileChunk) { func newS3PartProducer(source io.Reader, backlog int64, tmpDir string, diskWriteDurationMetric prometheus.Summary) (s3PartProducer, <-chan fileChunk) {
fileChan := make(chan fileChunk, backlog) fileChan := make(chan fileChunk, backlog)
doneChan := make(chan struct{}) doneChan := make(chan struct{})
if os.Getenv("TUSD_S3STORE_TEMP_MEMORY") == "1" {
tmpDir = TEMP_DIR_USE_MEMORY
}
partProducer := s3PartProducer{ partProducer := s3PartProducer{
tmpDir: tmpDir, tmpDir: tmpDir,
done: doneChan, done: doneChan,
@ -47,7 +55,7 @@ func (spp *s3PartProducer) stop() {
// If we return while there are still files in the channel, then // If we return while there are still files in the channel, then
// we may leak file descriptors. Let's ensure that those are cleaned up. // we may leak file descriptors. Let's ensure that those are cleaned up.
for fileChunk := range spp.files { for fileChunk := range spp.files {
cleanUpTempFile(fileChunk.file) fileChunk.closeReader()
} }
} }
@ -76,37 +84,72 @@ outerloop:
} }
func (spp *s3PartProducer) nextPart(size int64) (fileChunk, bool, error) { func (spp *s3PartProducer) nextPart(size int64) (fileChunk, bool, error) {
// Create a temporary file to store the part if spp.tmpDir != TEMP_DIR_USE_MEMORY {
file, err := ioutil.TempFile(spp.tmpDir, "tusd-s3-tmp-") // Create a temporary file to store the part
if err != nil { file, err := ioutil.TempFile(spp.tmpDir, "tusd-s3-tmp-")
return fileChunk{}, false, err if err != nil {
return fileChunk{}, false, err
}
limitedReader := io.LimitReader(spp.r, size)
start := time.Now()
n, err := io.Copy(file, limitedReader)
if err != nil {
return fileChunk{}, false, err
}
// If the entire request body is read and no more data is available,
// io.Copy returns 0 since it is unable to read any bytes. In that
// case, we can close the s3PartProducer.
if n == 0 {
cleanUpTempFile(file)
return fileChunk{}, false, nil
}
elapsed := time.Now().Sub(start)
ms := float64(elapsed.Nanoseconds() / int64(time.Millisecond))
spp.diskWriteDurationMetric.Observe(ms)
// Seek to the beginning of the file
file.Seek(0, 0)
return fileChunk{
reader: file,
closeReader: func() {
file.Close()
os.Remove(file.Name())
},
size: n,
}, true, nil
} else {
// Create a temporary buffer to store the part
buf := new(bytes.Buffer)
limitedReader := io.LimitReader(spp.r, size)
start := time.Now()
n, err := io.Copy(buf, limitedReader)
if err != nil {
return fileChunk{}, false, err
}
// If the entire request body is read and no more data is available,
// io.Copy returns 0 since it is unable to read any bytes. In that
// case, we can close the s3PartProducer.
if n == 0 {
return fileChunk{}, false, nil
}
elapsed := time.Now().Sub(start)
ms := float64(elapsed.Nanoseconds() / int64(time.Millisecond))
spp.diskWriteDurationMetric.Observe(ms)
return fileChunk{
// buf does not get written to anymore, so we can turn it into a reader
reader: bytes.NewReader(buf.Bytes()),
closeReader: func() {},
size: n,
}, true, nil
} }
limitedReader := io.LimitReader(spp.r, size)
start := time.Now()
n, err := io.Copy(file, limitedReader)
if err != nil {
return fileChunk{}, false, err
}
// If the entire request body is read and no more data is available,
// io.Copy returns 0 since it is unable to read any bytes. In that
// case, we can close the s3PartProducer.
if n == 0 {
cleanUpTempFile(file)
return fileChunk{}, false, nil
}
elapsed := time.Now().Sub(start)
ms := float64(elapsed.Nanoseconds() / int64(time.Millisecond))
spp.diskWriteDurationMetric.Observe(ms)
// Seek to the beginning of the file
file.Seek(0, 0)
return fileChunk{
file: file,
size: n,
}, true, nil
} }

View File

@ -2,7 +2,6 @@ package s3store
import ( import (
"errors" "errors"
"os"
"strings" "strings"
"testing" "testing"
"time" "time"
@ -34,7 +33,7 @@ func TestPartProducerConsumesEntireReaderWithoutError(t *testing.T) {
actualStr := "" actualStr := ""
b := make([]byte, 1) b := make([]byte, 1)
for chunk := range fileChan { for chunk := range fileChan {
n, err := chunk.file.Read(b) n, err := chunk.reader.Read(b)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %s", err) t.Fatalf("unexpected error: %s", err)
} }
@ -46,8 +45,7 @@ func TestPartProducerConsumesEntireReaderWithoutError(t *testing.T) {
} }
actualStr += string(b) actualStr += string(b)
os.Remove(chunk.file.Name()) chunk.closeReader()
chunk.file.Close()
} }
if actualStr != expectedStr { if actualStr != expectedStr {