Add first draft of parallel upload queue
This commit is contained in:
parent
8c5192c254
commit
86a329cef2
|
@ -147,7 +147,8 @@ type S3Store struct {
|
||||||
// the client and stored on disk while a part is being uploaded to S3. This
|
// the client and stored on disk while a part is being uploaded to S3. This
|
||||||
// can help improve throughput by not blocking the client while tusd is
|
// can help improve throughput by not blocking the client while tusd is
|
||||||
// communicating with the S3 API, which can have unpredictable latency.
|
// communicating with the S3 API, which can have unpredictable latency.
|
||||||
MaxBufferedParts int64
|
MaxBufferedParts int64
|
||||||
|
ConcurrentPartUploads int64
|
||||||
// TemporaryDirectory is the path where S3Store will create temporary files
|
// TemporaryDirectory is the path where S3Store will create temporary files
|
||||||
// on disk during the upload. An empty string ("", the default value) will
|
// on disk during the upload. An empty string ("", the default value) will
|
||||||
// cause S3Store to use the operating system's default temporary directory.
|
// cause S3Store to use the operating system's default temporary directory.
|
||||||
|
@ -173,10 +174,6 @@ type S3API interface {
|
||||||
UploadPartCopyWithContext(ctx context.Context, input *s3.UploadPartCopyInput, opt ...request.Option) (*s3.UploadPartCopyOutput, error)
|
UploadPartCopyWithContext(ctx context.Context, input *s3.UploadPartCopyInput, opt ...request.Option) (*s3.UploadPartCopyOutput, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type s3APIForPresigning interface {
|
|
||||||
UploadPartRequest(input *s3.UploadPartInput) (req *request.Request, output *s3.UploadPartOutput)
|
|
||||||
}
|
|
||||||
|
|
||||||
// New constructs a new storage using the supplied bucket and service object.
|
// New constructs a new storage using the supplied bucket and service object.
|
||||||
func New(bucket string, service S3API) S3Store {
|
func New(bucket string, service S3API) S3Store {
|
||||||
return S3Store{
|
return S3Store{
|
||||||
|
@ -190,6 +187,8 @@ func New(bucket string, service S3API) S3Store {
|
||||||
MaxBufferedParts: 20,
|
MaxBufferedParts: 20,
|
||||||
TemporaryDirectory: "",
|
TemporaryDirectory: "",
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: Start goroutine
|
||||||
}
|
}
|
||||||
|
|
||||||
// UseIn sets this store as the core data store in the passed composer and adds
|
// UseIn sets this store as the core data store in the passed composer and adds
|
||||||
|
@ -322,6 +321,8 @@ func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Read
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get number of parts to generate next number
|
// Get number of parts to generate next number
|
||||||
|
// TODO: Remove redudant call to listAllParts and use result from GetInfo, which already
|
||||||
|
// calls listAllParts.
|
||||||
parts, err := store.listAllParts(ctx, id)
|
parts, err := store.listAllParts(ctx, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
|
|
|
@ -0,0 +1,134 @@
|
||||||
|
package s3store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/aws/aws-sdk-go/aws/request"
|
||||||
|
"github.com/aws/aws-sdk-go/service/s3"
|
||||||
|
)
|
||||||
|
|
||||||
|
type s3UploadQueue struct {
|
||||||
|
service S3API
|
||||||
|
queue chan *s3UploadJob
|
||||||
|
disableContentHashes bool
|
||||||
|
}
|
||||||
|
|
||||||
|
type s3UploadJob struct {
|
||||||
|
// These fields are filled out be the job customer, i.e. the entity creating job.
|
||||||
|
ctx context.Context
|
||||||
|
uploadPartInput *s3.UploadPartInput
|
||||||
|
file *os.File
|
||||||
|
size int64
|
||||||
|
resultChannel chan<- *s3UploadJob
|
||||||
|
|
||||||
|
// These fields are filled out by the job worker, i.e. the entity doing the actual upload.
|
||||||
|
etag string
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
type s3APIForPresigning interface {
|
||||||
|
UploadPartRequest(input *s3.UploadPartInput) (req *request.Request, output *s3.UploadPartOutput)
|
||||||
|
}
|
||||||
|
|
||||||
|
// newS3UploadQueue create a new upload queue and starts the workers in the goroutines.
|
||||||
|
func newS3UploadQueue(service S3API, concurrency int64, maxBufferedParts int64, disableContentHashes bool) *s3UploadQueue {
|
||||||
|
s := &s3UploadQueue{
|
||||||
|
service: service,
|
||||||
|
queue: make(chan *s3UploadJob, maxBufferedParts),
|
||||||
|
disableContentHashes: disableContentHashes,
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < concurrency; i++ {
|
||||||
|
go s.uploadLoop()
|
||||||
|
}
|
||||||
|
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
// queueLength returns the number of item waiting in the queue.
|
||||||
|
func (s s3UploadQueue) queueLength() int {
|
||||||
|
return len(s.queue)
|
||||||
|
}
|
||||||
|
|
||||||
|
// push appends another item to the queue and returns immediately.
|
||||||
|
func (s s3UploadQueue) push(job *s3UploadJob) {
|
||||||
|
s.queue <- job
|
||||||
|
}
|
||||||
|
|
||||||
|
// stop instructs the worker goroutines to shutdown after they complete their
|
||||||
|
// current task. The function immediately returns without waiting for the shutdown.
|
||||||
|
func (s s3UploadQueue) stop() {
|
||||||
|
close(s.queue)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s s3UploadQueue) uploadLoop() {
|
||||||
|
for {
|
||||||
|
job, more := <-s.queue
|
||||||
|
if !more {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
job.etag, job.err = s.putPartForUpload(job.ctx, job.uploadPartInput, job.file, job.size)
|
||||||
|
job.resultChannel <- job
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s s3UploadQueue) putPartForUpload(ctx context.Context, uploadPartInput *s3.UploadPartInput, file *os.File, size int64) (etag string, err error) {
|
||||||
|
// TODO: Move this back into s3store where the file is created
|
||||||
|
defer cleanUpTempFile(file)
|
||||||
|
|
||||||
|
if !s.disableContentHashes {
|
||||||
|
// By default, use the traditional approach to upload data
|
||||||
|
uploadPartInput.Body = file
|
||||||
|
res, err := s.service.UploadPartWithContext(ctx, uploadPartInput)
|
||||||
|
if res.ETag != nil {
|
||||||
|
etag = *res.ETag
|
||||||
|
}
|
||||||
|
return etag, err
|
||||||
|
} else {
|
||||||
|
// Experimental feature to prevent the AWS SDK from calculating the SHA256 hash
|
||||||
|
// for the parts we upload to S3.
|
||||||
|
// We compute the presigned URL without the body attached and then send the request
|
||||||
|
// on our own. This way, the body is not included in the SHA256 calculation.
|
||||||
|
s3api, ok := s.service.Service.(s3APIForPresigning)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("s3store: failed to cast S3 service for presigning")
|
||||||
|
}
|
||||||
|
|
||||||
|
s3Req, _ := s3api.UploadPartRequest(uploadPartInput)
|
||||||
|
|
||||||
|
url, err := s3Req.Presign(15 * time.Minute)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
req, err := http.NewRequest("PUT", url, file)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set the Content-Length manually to prevent the usage of Transfer-Encoding: chunked,
|
||||||
|
// which is not supported by AWS S3.
|
||||||
|
req.ContentLength = size
|
||||||
|
|
||||||
|
res, err := http.DefaultClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
defer res.Body.Close()
|
||||||
|
|
||||||
|
if res.StatusCode != 200 {
|
||||||
|
buf := new(strings.Builder)
|
||||||
|
io.Copy(buf, res.Body)
|
||||||
|
return "", fmt.Errorf("s3store: unexpected response code %d for presigned upload: %s", res.StatusCode, buf.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
return res.Header.Get("ETag"), nil
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue