Add s3ChunkProducer

This commit is contained in:
Adam Jensen 2020-03-16 10:12:11 -04:00
parent 6c45ff2f0c
commit 2b3fb49096
1 changed files with 64 additions and 0 deletions

View File

@ -272,6 +272,70 @@ func (upload *s3Upload) writeInfo(ctx context.Context, info handler.FileInfo) er
return err return err
} }
type s3ChunkProducer struct {
files chan<- *os.File
done chan struct{}
err error
r io.Reader
}
func (scp *s3ChunkProducer) produce(chunkSize int64) {
for {
file, err := scp.nextChunk(chunkSize)
if err != nil {
scp.err = err
close(scp.files)
return
}
if file == nil {
close(scp.files)
return
}
select {
case scp.files <- file:
case <-scp.done:
close(scp.files)
return
}
}
}
func (scp *s3ChunkProducer) nextChunk(size int64) (*os.File, error) {
// Create a temporary file to store the part
file, err := ioutil.TempFile("", "tusd-s3-tmp-")
if err != nil {
return nil, err
}
limitedReader := io.LimitReader(scp.r, size)
n, err := io.Copy(file, limitedReader)
// If the HTTP PATCH request gets interrupted in the middle (e.g. because
// the user wants to pause the upload), Go's net/http returns an io.ErrUnexpectedEOF.
// However, for S3Store it's not important whether the stream has ended
// on purpose or accidentally. Therefore, we ignore this error to not
// prevent the remaining chunk to be stored on S3.
if err == io.ErrUnexpectedEOF {
err = nil
}
if err != nil {
return nil, err
}
// io.Copy returns (0, nil) when it reaches EOF
if n == 0 {
os.Remove(file.Name())
file.Close()
return nil, nil
}
// Seek to the beginning of the file
file.Seek(0, 0)
return file, nil
}
func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) { func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) {
id := upload.id id := upload.id
store := upload.store store := upload.store