diff --git a/pkg/s3store/s3store.go b/pkg/s3store/s3store.go index d3f7224..2a62236 100644 --- a/pkg/s3store/s3store.go +++ b/pkg/s3store/s3store.go @@ -272,6 +272,70 @@ func (upload *s3Upload) writeInfo(ctx context.Context, info handler.FileInfo) er return err } +type s3ChunkProducer struct { + files chan<- *os.File + done chan struct{} + err error + r io.Reader +} + +func (scp *s3ChunkProducer) produce(chunkSize int64) { + for { + file, err := scp.nextChunk(chunkSize) + if err != nil { + scp.err = err + close(scp.files) + return + } + if file == nil { + close(scp.files) + return + } + select { + case scp.files <- file: + case <-scp.done: + close(scp.files) + return + } + } +} + +func (scp *s3ChunkProducer) nextChunk(size int64) (*os.File, error) { + // Create a temporary file to store the part + file, err := ioutil.TempFile("", "tusd-s3-tmp-") + if err != nil { + return nil, err + } + + limitedReader := io.LimitReader(scp.r, size) + n, err := io.Copy(file, limitedReader) + + // If the HTTP PATCH request gets interrupted in the middle (e.g. because + // the user wants to pause the upload), Go's net/http returns an io.ErrUnexpectedEOF. + // However, for S3Store it's not important whether the stream has ended + // on purpose or accidentally. Therefore, we ignore this error to not + // prevent the remaining chunk to be stored on S3. + if err == io.ErrUnexpectedEOF { + err = nil + } + + if err != nil { + return nil, err + } + + // io.Copy returns (0, nil) when it reaches EOF + if n == 0 { + os.Remove(file.Name()) + file.Close() + return nil, nil + } + + // Seek to the beginning of the file + file.Seek(0, 0) + + return file, nil +} + func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) { id := upload.id store := upload.store