diff --git a/pkg/s3store/s3store.go b/pkg/s3store/s3store.go index 4d5e900..fe64b3c 100644 --- a/pkg/s3store/s3store.go +++ b/pkg/s3store/s3store.go @@ -147,7 +147,8 @@ type S3Store struct { // the client and stored on disk while a part is being uploaded to S3. This // can help improve throughput by not blocking the client while tusd is // communicating with the S3 API, which can have unpredictable latency. - MaxBufferedParts int64 + MaxBufferedParts int64 + ConcurrentPartUploads int64 // TemporaryDirectory is the path where S3Store will create temporary files // on disk during the upload. An empty string ("", the default value) will // cause S3Store to use the operating system's default temporary directory. @@ -173,10 +174,6 @@ type S3API interface { UploadPartCopyWithContext(ctx context.Context, input *s3.UploadPartCopyInput, opt ...request.Option) (*s3.UploadPartCopyOutput, error) } -type s3APIForPresigning interface { - UploadPartRequest(input *s3.UploadPartInput) (req *request.Request, output *s3.UploadPartOutput) -} - // New constructs a new storage using the supplied bucket and service object. func New(bucket string, service S3API) S3Store { return S3Store{ @@ -190,6 +187,8 @@ func New(bucket string, service S3API) S3Store { MaxBufferedParts: 20, TemporaryDirectory: "", } + + // TODO: Start goroutine } // UseIn sets this store as the core data store in the passed composer and adds @@ -322,6 +321,8 @@ func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Read } // Get number of parts to generate next number + // TODO: Remove redudant call to listAllParts and use result from GetInfo, which already + // calls listAllParts. parts, err := store.listAllParts(ctx, id) if err != nil { return 0, err diff --git a/pkg/s3store/s3store_upload_queue.go b/pkg/s3store/s3store_upload_queue.go new file mode 100644 index 0000000..9707e7b --- /dev/null +++ b/pkg/s3store/s3store_upload_queue.go @@ -0,0 +1,134 @@ +package s3store + +import ( + "context" + "fmt" + "io" + "net/http" + "os" + "strings" + "time" + + "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/service/s3" +) + +type s3UploadQueue struct { + service S3API + queue chan *s3UploadJob + disableContentHashes bool +} + +type s3UploadJob struct { + // These fields are filled out be the job customer, i.e. the entity creating job. + ctx context.Context + uploadPartInput *s3.UploadPartInput + file *os.File + size int64 + resultChannel chan<- *s3UploadJob + + // These fields are filled out by the job worker, i.e. the entity doing the actual upload. + etag string + err error +} + +type s3APIForPresigning interface { + UploadPartRequest(input *s3.UploadPartInput) (req *request.Request, output *s3.UploadPartOutput) +} + +// newS3UploadQueue create a new upload queue and starts the workers in the goroutines. +func newS3UploadQueue(service S3API, concurrency int64, maxBufferedParts int64, disableContentHashes bool) *s3UploadQueue { + s := &s3UploadQueue{ + service: service, + queue: make(chan *s3UploadJob, maxBufferedParts), + disableContentHashes: disableContentHashes, + } + + for i := 0; i < concurrency; i++ { + go s.uploadLoop() + } + + return s +} + +// queueLength returns the number of item waiting in the queue. +func (s s3UploadQueue) queueLength() int { + return len(s.queue) +} + +// push appends another item to the queue and returns immediately. +func (s s3UploadQueue) push(job *s3UploadJob) { + s.queue <- job +} + +// stop instructs the worker goroutines to shutdown after they complete their +// current task. The function immediately returns without waiting for the shutdown. +func (s s3UploadQueue) stop() { + close(s.queue) +} + +func (s s3UploadQueue) uploadLoop() { + for { + job, more := <-s.queue + if !more { + break + } + + job.etag, job.err = s.putPartForUpload(job.ctx, job.uploadPartInput, job.file, job.size) + job.resultChannel <- job + } +} + +func (s s3UploadQueue) putPartForUpload(ctx context.Context, uploadPartInput *s3.UploadPartInput, file *os.File, size int64) (etag string, err error) { + // TODO: Move this back into s3store where the file is created + defer cleanUpTempFile(file) + + if !s.disableContentHashes { + // By default, use the traditional approach to upload data + uploadPartInput.Body = file + res, err := s.service.UploadPartWithContext(ctx, uploadPartInput) + if res.ETag != nil { + etag = *res.ETag + } + return etag, err + } else { + // Experimental feature to prevent the AWS SDK from calculating the SHA256 hash + // for the parts we upload to S3. + // We compute the presigned URL without the body attached and then send the request + // on our own. This way, the body is not included in the SHA256 calculation. + s3api, ok := s.service.Service.(s3APIForPresigning) + if !ok { + return fmt.Errorf("s3store: failed to cast S3 service for presigning") + } + + s3Req, _ := s3api.UploadPartRequest(uploadPartInput) + + url, err := s3Req.Presign(15 * time.Minute) + if err != nil { + return "", err + } + + req, err := http.NewRequest("PUT", url, file) + if err != nil { + return "", err + } + + // Set the Content-Length manually to prevent the usage of Transfer-Encoding: chunked, + // which is not supported by AWS S3. + req.ContentLength = size + + res, err := http.DefaultClient.Do(req) + if err != nil { + return "", err + } + defer res.Body.Close() + + if res.StatusCode != 200 { + buf := new(strings.Builder) + io.Copy(buf, res.Body) + return "", fmt.Errorf("s3store: unexpected response code %d for presigned upload: %s", res.StatusCode, buf.String()) + } + + return res.Header.Get("ETag"), nil + } +}