2021-04-26 10:18:09 +00:00
|
|
|
package s3store
|
|
|
|
|
|
|
|
import (
|
2022-04-10 19:13:07 +00:00
|
|
|
"bytes"
|
2021-04-26 10:18:09 +00:00
|
|
|
"io"
|
|
|
|
"io/ioutil"
|
|
|
|
"os"
|
2021-05-28 11:26:13 +00:00
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
2021-04-26 10:18:09 +00:00
|
|
|
)
|
|
|
|
|
2022-04-10 19:13:07 +00:00
|
|
|
const TEMP_DIR_USE_MEMORY = "_memory"
|
|
|
|
|
2021-04-26 10:18:09 +00:00
|
|
|
// s3PartProducer converts a stream of bytes from the reader into a stream of files on disk
|
|
|
|
type s3PartProducer struct {
|
2021-05-28 11:26:13 +00:00
|
|
|
tmpDir string
|
|
|
|
files chan fileChunk
|
|
|
|
done chan struct{}
|
|
|
|
err error
|
|
|
|
r io.Reader
|
|
|
|
diskWriteDurationMetric prometheus.Summary
|
2021-05-18 08:29:18 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type fileChunk struct {
|
2022-04-10 19:13:07 +00:00
|
|
|
reader io.ReadSeeker
|
|
|
|
closeReader func()
|
|
|
|
size int64
|
2021-05-18 08:29:18 +00:00
|
|
|
}
|
|
|
|
|
2021-05-28 11:26:13 +00:00
|
|
|
func newS3PartProducer(source io.Reader, backlog int64, tmpDir string, diskWriteDurationMetric prometheus.Summary) (s3PartProducer, <-chan fileChunk) {
|
2021-05-18 08:29:18 +00:00
|
|
|
fileChan := make(chan fileChunk, backlog)
|
|
|
|
doneChan := make(chan struct{})
|
|
|
|
|
2022-04-10 19:13:07 +00:00
|
|
|
if os.Getenv("TUSD_S3STORE_TEMP_MEMORY") == "1" {
|
|
|
|
tmpDir = TEMP_DIR_USE_MEMORY
|
|
|
|
}
|
|
|
|
|
2021-05-18 08:29:18 +00:00
|
|
|
partProducer := s3PartProducer{
|
2021-05-28 11:26:13 +00:00
|
|
|
tmpDir: tmpDir,
|
|
|
|
done: doneChan,
|
|
|
|
files: fileChan,
|
|
|
|
r: source,
|
|
|
|
diskWriteDurationMetric: diskWriteDurationMetric,
|
2021-05-18 08:29:18 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return partProducer, fileChan
|
|
|
|
}
|
|
|
|
|
|
|
|
// stop should always be called by the consumer to ensure that the channels
|
|
|
|
// are properly closed and emptied.
|
|
|
|
func (spp *s3PartProducer) stop() {
|
|
|
|
close(spp.done)
|
|
|
|
|
|
|
|
// If we return while there are still files in the channel, then
|
|
|
|
// we may leak file descriptors. Let's ensure that those are cleaned up.
|
|
|
|
for fileChunk := range spp.files {
|
2022-04-10 19:13:07 +00:00
|
|
|
fileChunk.closeReader()
|
2021-05-18 08:29:18 +00:00
|
|
|
}
|
2021-04-26 10:18:09 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (spp *s3PartProducer) produce(partSize int64) {
|
2021-05-18 08:29:18 +00:00
|
|
|
outerloop:
|
2021-04-26 10:18:09 +00:00
|
|
|
for {
|
2021-05-18 08:29:18 +00:00
|
|
|
file, ok, err := spp.nextPart(partSize)
|
2021-04-26 10:18:09 +00:00
|
|
|
if err != nil {
|
2021-05-18 08:29:18 +00:00
|
|
|
// An error occured. Stop producing.
|
2021-04-26 10:18:09 +00:00
|
|
|
spp.err = err
|
2021-05-18 08:29:18 +00:00
|
|
|
break
|
2021-04-26 10:18:09 +00:00
|
|
|
}
|
2021-05-18 08:29:18 +00:00
|
|
|
if !ok {
|
|
|
|
// The source was fully read. Stop producing.
|
|
|
|
break
|
2021-04-26 10:18:09 +00:00
|
|
|
}
|
|
|
|
select {
|
|
|
|
case spp.files <- file:
|
|
|
|
case <-spp.done:
|
2021-05-18 08:29:18 +00:00
|
|
|
// We are told to stop producing. Stop producing.
|
|
|
|
break outerloop
|
2021-04-26 10:18:09 +00:00
|
|
|
}
|
|
|
|
}
|
2021-05-18 08:29:18 +00:00
|
|
|
|
|
|
|
close(spp.files)
|
2021-04-26 10:18:09 +00:00
|
|
|
}
|
|
|
|
|
2021-05-18 08:29:18 +00:00
|
|
|
func (spp *s3PartProducer) nextPart(size int64) (fileChunk, bool, error) {
|
2022-04-10 19:13:07 +00:00
|
|
|
if spp.tmpDir != TEMP_DIR_USE_MEMORY {
|
|
|
|
// Create a temporary file to store the part
|
|
|
|
file, err := ioutil.TempFile(spp.tmpDir, "tusd-s3-tmp-")
|
|
|
|
if err != nil {
|
|
|
|
return fileChunk{}, false, err
|
|
|
|
}
|
2021-04-26 10:18:09 +00:00
|
|
|
|
2022-04-10 19:13:07 +00:00
|
|
|
limitedReader := io.LimitReader(spp.r, size)
|
|
|
|
start := time.Now()
|
2021-05-28 11:26:13 +00:00
|
|
|
|
2022-04-10 19:13:07 +00:00
|
|
|
n, err := io.Copy(file, limitedReader)
|
|
|
|
if err != nil {
|
|
|
|
return fileChunk{}, false, err
|
|
|
|
}
|
2021-04-26 10:18:09 +00:00
|
|
|
|
2022-04-10 19:13:07 +00:00
|
|
|
// If the entire request body is read and no more data is available,
|
|
|
|
// io.Copy returns 0 since it is unable to read any bytes. In that
|
|
|
|
// case, we can close the s3PartProducer.
|
|
|
|
if n == 0 {
|
|
|
|
cleanUpTempFile(file)
|
|
|
|
return fileChunk{}, false, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
elapsed := time.Now().Sub(start)
|
|
|
|
ms := float64(elapsed.Nanoseconds() / int64(time.Millisecond))
|
|
|
|
spp.diskWriteDurationMetric.Observe(ms)
|
|
|
|
|
|
|
|
// Seek to the beginning of the file
|
|
|
|
file.Seek(0, 0)
|
|
|
|
|
|
|
|
return fileChunk{
|
|
|
|
reader: file,
|
|
|
|
closeReader: func() {
|
|
|
|
file.Close()
|
|
|
|
os.Remove(file.Name())
|
|
|
|
},
|
|
|
|
size: n,
|
|
|
|
}, true, nil
|
|
|
|
} else {
|
|
|
|
// Create a temporary buffer to store the part
|
|
|
|
buf := new(bytes.Buffer)
|
|
|
|
|
|
|
|
limitedReader := io.LimitReader(spp.r, size)
|
|
|
|
start := time.Now()
|
|
|
|
|
|
|
|
n, err := io.Copy(buf, limitedReader)
|
|
|
|
if err != nil {
|
|
|
|
return fileChunk{}, false, err
|
|
|
|
}
|
2021-04-26 10:18:09 +00:00
|
|
|
|
2022-04-10 19:13:07 +00:00
|
|
|
// If the entire request body is read and no more data is available,
|
|
|
|
// io.Copy returns 0 since it is unable to read any bytes. In that
|
|
|
|
// case, we can close the s3PartProducer.
|
|
|
|
if n == 0 {
|
|
|
|
return fileChunk{}, false, nil
|
|
|
|
}
|
2021-05-28 11:26:13 +00:00
|
|
|
|
2022-04-10 19:13:07 +00:00
|
|
|
elapsed := time.Now().Sub(start)
|
|
|
|
ms := float64(elapsed.Nanoseconds() / int64(time.Millisecond))
|
|
|
|
spp.diskWriteDurationMetric.Observe(ms)
|
2021-04-26 10:18:09 +00:00
|
|
|
|
2022-04-10 19:13:07 +00:00
|
|
|
return fileChunk{
|
|
|
|
// buf does not get written to anymore, so we can turn it into a reader
|
|
|
|
reader: bytes.NewReader(buf.Bytes()),
|
|
|
|
closeReader: func() {},
|
|
|
|
size: n,
|
|
|
|
}, true, nil
|
|
|
|
}
|
2021-04-26 10:18:09 +00:00
|
|
|
}
|