Add support for upload termination in S3Store
This commit is contained in:
parent
28a7b9d950
commit
810fa89494
|
@ -10,10 +10,12 @@ import (
|
|||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/tus/tusd"
|
||||
"github.com/tus/tusd/uid"
|
||||
|
@ -244,24 +246,64 @@ func (store S3Store) GetReader(id string) (io.Reader, error) {
|
|||
|
||||
func (store S3Store) Terminate(id string) error {
|
||||
uploadId, multipartId := splitIds(id)
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
errs := make([]error, 0, 3)
|
||||
|
||||
// Abort the multipart upload first
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
// Abort the multipart upload
|
||||
_, err := store.Service.AbortMultipartUpload(&s3.AbortMultipartUploadInput{
|
||||
Bucket: aws.String(store.Bucket),
|
||||
Key: aws.String(uploadId),
|
||||
UploadId: aws.String(multipartId),
|
||||
})
|
||||
if err != nil && !isAwsError(err, "NoSuchUpload") {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
// Delete the info and content file
|
||||
res, err := store.Service.DeleteObjects(&s3.DeleteObjectsInput{
|
||||
Bucket: aws.String(store.Bucket),
|
||||
Delete: &s3.Delete{
|
||||
Objects: []*s3.ObjectIdentifier{
|
||||
{
|
||||
Key: aws.String(uploadId),
|
||||
},
|
||||
{
|
||||
Key: aws.String(uploadId + ".info"),
|
||||
},
|
||||
},
|
||||
Quiet: aws.Bool(true),
|
||||
},
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
if err, ok := err.(awserr.Error); ok && err.Code() == "NoSuchUpload" {
|
||||
// Test whether the multipart upload exists to find out if the upload
|
||||
// never existsted or just has not been finished yet (TODO)
|
||||
return tusd.ErrNotFound
|
||||
errs = append(errs, err)
|
||||
return
|
||||
}
|
||||
|
||||
return err
|
||||
for _, s3Err := range res.Errors {
|
||||
if *s3Err.Code != "NoSuchKey" {
|
||||
errs = append(errs, fmt.Errorf("AWS S3 Error (%s) for object %s: %s", *s3Err.Code, *s3Err.Key, *s3Err.Message))
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// TODO delete info file
|
||||
wg.Wait()
|
||||
|
||||
if len(errs) > 0 {
|
||||
message := "Multiple errors occured:\n"
|
||||
for _, err := range errs {
|
||||
message += "\t" + err.Error() + "\n"
|
||||
}
|
||||
return errors.New(message)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -442,3 +442,80 @@ func TestWriteChunkAllowTooSmallLast(t *testing.T) {
|
|||
assert.Nil(err)
|
||||
assert.Equal(int64(10), bytesRead)
|
||||
}
|
||||
|
||||
func TestTerminate(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
assert := assert.New(t)
|
||||
|
||||
s3obj := NewMockS3API(mockCtrl)
|
||||
store := s3store.New("bucket", s3obj)
|
||||
|
||||
// Order is not important in this situation.
|
||||
s3obj.EXPECT().AbortMultipartUpload(&s3.AbortMultipartUploadInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId"),
|
||||
UploadId: aws.String("multipartId"),
|
||||
}).Return(nil, nil)
|
||||
|
||||
s3obj.EXPECT().DeleteObjects(&s3.DeleteObjectsInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Delete: &s3.Delete{
|
||||
Objects: []*s3.ObjectIdentifier{
|
||||
{
|
||||
Key: aws.String("uploadId"),
|
||||
},
|
||||
{
|
||||
Key: aws.String("uploadId.info"),
|
||||
},
|
||||
},
|
||||
Quiet: aws.Bool(true),
|
||||
},
|
||||
}).Return(&s3.DeleteObjectsOutput{}, nil)
|
||||
|
||||
err := store.Terminate("uploadId+multipartId")
|
||||
assert.Nil(err)
|
||||
}
|
||||
|
||||
func TestTerminateWithErrors(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
defer mockCtrl.Finish()
|
||||
assert := assert.New(t)
|
||||
|
||||
s3obj := NewMockS3API(mockCtrl)
|
||||
store := s3store.New("bucket", s3obj)
|
||||
|
||||
// Order is not important in this situation.
|
||||
// NoSuchUpload errors should be ignored
|
||||
s3obj.EXPECT().AbortMultipartUpload(&s3.AbortMultipartUploadInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("uploadId"),
|
||||
UploadId: aws.String("multipartId"),
|
||||
}).Return(nil, awserr.New("NoSuchUpload", "The specified upload does not exist.", nil))
|
||||
|
||||
s3obj.EXPECT().DeleteObjects(&s3.DeleteObjectsInput{
|
||||
Bucket: aws.String("bucket"),
|
||||
Delete: &s3.Delete{
|
||||
Objects: []*s3.ObjectIdentifier{
|
||||
{
|
||||
Key: aws.String("uploadId"),
|
||||
},
|
||||
{
|
||||
Key: aws.String("uploadId.info"),
|
||||
},
|
||||
},
|
||||
Quiet: aws.Bool(true),
|
||||
},
|
||||
}).Return(&s3.DeleteObjectsOutput{
|
||||
Errors: []*s3.Error{
|
||||
{
|
||||
Code: aws.String("hello"),
|
||||
Key: aws.String("uploadId"),
|
||||
Message: aws.String("it's me."),
|
||||
},
|
||||
},
|
||||
}, nil)
|
||||
|
||||
err := store.Terminate("uploadId+multipartId")
|
||||
assert.Equal("Multiple errors occured:\n\tAWS S3 Error (hello) for object uploadId: it's me.\n", err.Error())
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue