S3Store: fix GCS delete failure with single deletes fallback

This commit is contained in:
Andy Hovingh 2021-05-20 14:01:46 -05:00
parent d560c4e753
commit 50bd188e34
2 changed files with 194 additions and 2 deletions

View File

@ -614,9 +614,40 @@ func (upload s3Upload) Terminate(ctx context.Context) error {
}) })
if err != nil { if err != nil {
if reqErr, ok := err.(awserr.RequestFailure); ok && reqErr.StatusCode() == 400 {
// this might be a call to something like Google Cloud Storage, which does not
// support multi-delete https://cloud.google.com/storage/docs/migrating#methods-comparison and would
// return a Bad Request error in this case.
// in such case(s) try falling back to single deletes...
var singleDeletesWG sync.WaitGroup
singleDeletesWG.Add(3)
deleteObjectWithKey := func(key *string) {
defer singleDeletesWG.Done()
_, err := store.Service.DeleteObjectWithContext(ctx, &s3.DeleteObjectInput{
Bucket: aws.String(store.Bucket),
Key: key,
})
if err != nil {
if fallbackErr, ok := err.(awserr.RequestFailure); !ok || fallbackErr.Code() != "NoSuchKey" {
errs = append(errs, err) errs = append(errs, err)
return return
} }
}
}
go deleteObjectWithKey(store.keyWithPrefix(uploadId))
go deleteObjectWithKey(store.metadataKeyWithPrefix(uploadId + ".part"))
go deleteObjectWithKey(store.metadataKeyWithPrefix(uploadId + ".info"))
singleDeletesWG.Wait()
} else {
errs = append(errs, err)
return
}
}
for _, s3Err := range res.Errors { for _, s3Err := range res.Errors {
if *s3Err.Code != "NoSuchKey" { if *s3Err.Code != "NoSuchKey" {

View File

@ -1092,6 +1092,167 @@ func TestTerminateWithErrors(t *testing.T) {
assert.Equal("Multiple errors occurred:\n\tAWS S3 Error (hello) for object uploadId: it's me.\n", err.Error()) assert.Equal("Multiple errors occurred:\n\tAWS S3 Error (hello) for object uploadId: it's me.\n", err.Error())
} }
func TestTerminateWithSingleDeletesFallback(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
assert := assert.New(t)
s3obj := NewMockS3API(mockCtrl)
store := New("bucket", s3obj)
// Order is not important in this situation.
// NoSuchUpload errors should be ignored
s3obj.EXPECT().AbortMultipartUploadWithContext(context.Background(), &s3.AbortMultipartUploadInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
}).Return(nil, awserr.New("NoSuchUpload", "The specified upload does not exist.", nil))
// when calling an S3 API (almost) compatible service, multi-delete may not be supported,
// namely Google Cloud Storage:
// https://cloud.google.com/storage/docs/migrating#methods-comparison
// https://github.com/boto/boto3/issues/2293
// in such case(s), the implementation should try single deletes
s3obj.EXPECT().DeleteObjectsWithContext(context.Background(), &s3.DeleteObjectsInput{
Bucket: aws.String("bucket"),
Delete: &s3.Delete{
Objects: []*s3.ObjectIdentifier{
{
Key: aws.String("uploadId"),
},
{
Key: aws.String("uploadId.part"),
},
{
Key: aws.String("uploadId.info"),
},
},
Quiet: aws.Bool(true),
},
}).Return(&s3.DeleteObjectsOutput{}, awserr.NewRequestFailure(
awserr.New(
"InvalidArgument",
"Invalid argument.",
nil,
),
400,
"",
))
// NoSuchUpload errors should be ignored
s3obj.EXPECT().DeleteObjectWithContext(context.Background(), &s3.DeleteObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
}).Return(&s3.DeleteObjectOutput{}, nil)
s3obj.EXPECT().DeleteObjectWithContext(context.Background(), &s3.DeleteObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
}).Return(&s3.DeleteObjectOutput{}, awserr.NewRequestFailure(
awserr.New(
"NoSuchKey",
"The specified key does not exist.",
nil,
),
404,
"",
))
s3obj.EXPECT().DeleteObjectWithContext(context.Background(), &s3.DeleteObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.info"),
}).Return(&s3.DeleteObjectOutput{}, nil)
upload, err := store.GetUpload(context.Background(), "uploadId+multipartId")
assert.Nil(err)
err = store.AsTerminatableUpload(upload).Terminate(context.Background())
assert.Nil(err)
}
func TestTerminateWithSingleDeletesFallbackAndFallbackErrors(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
assert := assert.New(t)
s3obj := NewMockS3API(mockCtrl)
store := New("bucket", s3obj)
// Order is not important in this situation.
// NoSuchUpload errors should be ignored
s3obj.EXPECT().AbortMultipartUploadWithContext(context.Background(), &s3.AbortMultipartUploadInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
UploadId: aws.String("multipartId"),
}).Return(nil, awserr.New("NoSuchUpload", "The specified upload does not exist.", nil))
// when calling an S3 API (almost) compatible service, multi-delete may not be supported,
// namely Google Cloud Storage:
// https://cloud.google.com/storage/docs/migrating#methods-comparison
// https://github.com/boto/boto3/issues/2293
// in such case(s), the implementation should try single deletes
s3obj.EXPECT().DeleteObjectsWithContext(context.Background(), &s3.DeleteObjectsInput{
Bucket: aws.String("bucket"),
Delete: &s3.Delete{
Objects: []*s3.ObjectIdentifier{
{
Key: aws.String("uploadId"),
},
{
Key: aws.String("uploadId.part"),
},
{
Key: aws.String("uploadId.info"),
},
},
Quiet: aws.Bool(true),
},
}).Return(&s3.DeleteObjectsOutput{}, awserr.NewRequestFailure(
awserr.New(
"InvalidArgument",
"Invalid argument.",
nil,
),
400,
"",
))
// simulates a failure doing the single delete too
// NoSuchUpload errors should be ignored
s3obj.EXPECT().DeleteObjectWithContext(context.Background(), &s3.DeleteObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId"),
}).Return(&s3.DeleteObjectOutput{}, awserr.NewRequestFailure(
awserr.New(
"SomeErrorCode",
"Some Error",
nil,
),
400,
"",
))
s3obj.EXPECT().DeleteObjectWithContext(context.Background(), &s3.DeleteObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.part"),
}).Return(&s3.DeleteObjectOutput{}, awserr.NewRequestFailure(
awserr.New(
"NoSuchKey",
"The specified key does not exist.",
nil,
),
404,
"",
))
s3obj.EXPECT().DeleteObjectWithContext(context.Background(), &s3.DeleteObjectInput{
Bucket: aws.String("bucket"),
Key: aws.String("uploadId.info"),
}).Return(&s3.DeleteObjectOutput{}, nil)
upload, err := store.GetUpload(context.Background(), "uploadId+multipartId")
assert.Nil(err)
err = store.AsTerminatableUpload(upload).Terminate(context.Background())
assert.Equal("Multiple errors occurred:\n\tSomeErrorCode: Some Error\n\tstatus code: 400, request id: \n", err.Error())
}
func TestConcatUploadsUsingMultipart(t *testing.T) { func TestConcatUploadsUsingMultipart(t *testing.T) {
mockCtrl := gomock.NewController(t) mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish() defer mockCtrl.Finish()