s3store: Move to new DataStore interfaces

This commit is contained in:
Marius 2019-09-10 10:05:45 +02:00
parent 12114b1ae8
commit d36b0ea3a9
2 changed files with 124 additions and 42 deletions

View File

@ -162,16 +162,19 @@ func New(bucket string, service S3API) S3Store {
func (store S3Store) UseIn(composer *handler.StoreComposer) { func (store S3Store) UseIn(composer *handler.StoreComposer) {
composer.UseCore(store) composer.UseCore(store)
composer.UseTerminater(store) composer.UseTerminater(store)
composer.UseFinisher(store)
composer.UseGetReader(store)
composer.UseConcater(store) composer.UseConcater(store)
composer.UseLengthDeferrer(store) composer.UseLengthDeferrer(store)
} }
func (store S3Store) NewUpload(info handler.FileInfo) (id string, err error) { type s3Upload struct {
id string
store *S3Store
}
func (store S3Store) NewUpload(info handler.FileInfo) (handler.Upload, error) {
// an upload larger than MaxObjectSize must throw an error // an upload larger than MaxObjectSize must throw an error
if info.Size > store.MaxObjectSize { if info.Size > store.MaxObjectSize {
return "", fmt.Errorf("s3store: upload size of %v bytes exceeds MaxObjectSize of %v bytes", info.Size, store.MaxObjectSize) return nil, fmt.Errorf("s3store: upload size of %v bytes exceeds MaxObjectSize of %v bytes", info.Size, store.MaxObjectSize)
} }
var uploadId string var uploadId string
@ -198,10 +201,10 @@ func (store S3Store) NewUpload(info handler.FileInfo) (id string, err error) {
Metadata: metadata, Metadata: metadata,
}) })
if err != nil { if err != nil {
return "", fmt.Errorf("s3store: unable to create multipart upload:\n%s", err) return nil, fmt.Errorf("s3store: unable to create multipart upload:\n%s", err)
} }
id = uploadId + "+" + *res.UploadId id := uploadId + "+" + *res.UploadId
info.ID = id info.ID = id
info.Storage = map[string]string{ info.Storage = map[string]string{
@ -212,10 +215,22 @@ func (store S3Store) NewUpload(info handler.FileInfo) (id string, err error) {
err = store.writeInfo(uploadId, info) err = store.writeInfo(uploadId, info)
if err != nil { if err != nil {
return "", fmt.Errorf("s3store: unable to create info file:\n%s", err) return nil, fmt.Errorf("s3store: unable to create info file:\n%s", err)
} }
return id, nil return &s3Upload{id, &store}, nil
}
func (store S3Store) GetUpload(id string) (handler.Upload, error) {
return &s3Upload{id, &store}, nil
}
func (store S3Store) AsTerminatableUpload(upload handler.Upload) handler.TerminatableUpload {
return upload.(*s3Upload)
}
func (store S3Store) AsLengthDeclarableUpload(upload handler.Upload) handler.LengthDeclarableUpload {
return upload.(*s3Upload)
} }
func (store S3Store) writeInfo(uploadId string, info handler.FileInfo) error { func (store S3Store) writeInfo(uploadId string, info handler.FileInfo) error {
@ -235,11 +250,14 @@ func (store S3Store) writeInfo(uploadId string, info handler.FileInfo) error {
return err return err
} }
func (store S3Store) WriteChunk(id string, offset int64, src io.Reader) (int64, error) { func (upload s3Upload) WriteChunk(offset int64, src io.Reader) (int64, error) {
id := upload.id
store := upload.store
uploadId, multipartId := splitIds(id) uploadId, multipartId := splitIds(id)
// Get the total size of the current upload // Get the total size of the current upload
info, err := store.GetInfo(id) info, err := upload.GetInfo()
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -336,7 +354,9 @@ func (store S3Store) WriteChunk(id string, offset int64, src io.Reader) (int64,
} }
} }
func (store S3Store) GetInfo(id string) (info handler.FileInfo, err error) { func (upload s3Upload) GetInfo() (info handler.FileInfo, err error) {
id := upload.id
store := upload.store
uploadId, _ := splitIds(id) uploadId, _ := splitIds(id)
// Get file info stored in separate object // Get file info stored in separate object
@ -391,7 +411,9 @@ func (store S3Store) GetInfo(id string) (info handler.FileInfo, err error) {
return return
} }
func (store S3Store) GetReader(id string) (io.Reader, error) { func (upload s3Upload) GetReader() (io.Reader, error) {
id := upload.id
store := upload.store
uploadId, multipartId := splitIds(id) uploadId, multipartId := splitIds(id)
// Attempt to get upload content // Attempt to get upload content
@ -432,7 +454,9 @@ func (store S3Store) GetReader(id string) (io.Reader, error) {
return nil, err return nil, err
} }
func (store S3Store) Terminate(id string) error { func (upload s3Upload) Terminate() error {
id := upload.id
store := upload.store
uploadId, multipartId := splitIds(id) uploadId, multipartId := splitIds(id)
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(2) wg.Add(2)
@ -495,7 +519,9 @@ func (store S3Store) Terminate(id string) error {
return nil return nil
} }
func (store S3Store) FinishUpload(id string) error { func (upload s3Upload) FinishUpload() error {
id := upload.id
store := upload.store
uploadId, multipartId := splitIds(id) uploadId, multipartId := splitIds(id)
// Get uploaded parts // Get uploaded parts
@ -564,12 +590,18 @@ func (store S3Store) ConcatUploads(dest string, partialUploads []string) error {
return newMultiError(errs) return newMultiError(errs)
} }
return store.FinishUpload(dest) upload, err := store.GetUpload(dest)
if err != nil {
return err
}
return upload.FinishUpload()
} }
func (store S3Store) DeclareLength(id string, length int64) error { func (upload s3Upload) DeclareLength(length int64) error {
id := upload.id
store := upload.store
uploadId, _ := splitIds(id) uploadId, _ := splitIds(id)
info, err := store.GetInfo(id) info, err := upload.GetInfo()
if err != nil { if err != nil {
return err return err
} }

View File

@ -20,10 +20,9 @@ import (
// Test interface implementations // Test interface implementations
var _ handler.DataStore = S3Store{} var _ handler.DataStore = S3Store{}
var _ handler.GetReaderDataStore = S3Store{}
var _ handler.TerminaterDataStore = S3Store{} var _ handler.TerminaterDataStore = S3Store{}
var _ handler.FinisherDataStore = S3Store{}
var _ handler.ConcaterDataStore = S3Store{} var _ handler.ConcaterDataStore = S3Store{}
var _ handler.LengthDeferrerDataStore = S3Store{}
func TestNewUpload(t *testing.T) { func TestNewUpload(t *testing.T) {
mockCtrl := gomock.NewController(t) mockCtrl := gomock.NewController(t)
@ -67,9 +66,9 @@ func TestNewUpload(t *testing.T) {
}, },
} }
id, err := store.NewUpload(info) upload, err := store.NewUpload(info)
assert.Nil(err) assert.Nil(err)
assert.Equal("uploadId+multipartId", id) assert.NotNil(upload)
} }
func TestNewUploadWithObjectPrefix(t *testing.T) { func TestNewUploadWithObjectPrefix(t *testing.T) {
@ -115,9 +114,9 @@ func TestNewUploadWithObjectPrefix(t *testing.T) {
}, },
} }
id, err := store.NewUpload(info) upload, err := store.NewUpload(info)
assert.Nil(err) assert.Nil(err)
assert.Equal("uploadId+multipartId", id) assert.NotNil(upload)
} }
func TestNewUploadLargerMaxObjectSize(t *testing.T) { func TestNewUploadLargerMaxObjectSize(t *testing.T) {
@ -136,10 +135,10 @@ func TestNewUploadLargerMaxObjectSize(t *testing.T) {
Size: store.MaxObjectSize + 1, Size: store.MaxObjectSize + 1,
} }
id, err := store.NewUpload(info) upload, err := store.NewUpload(info)
assert.NotNil(err) assert.NotNil(err)
assert.EqualError(err, fmt.Sprintf("s3store: upload size of %v bytes exceeds MaxObjectSize of %v bytes", info.Size, store.MaxObjectSize)) assert.EqualError(err, fmt.Sprintf("s3store: upload size of %v bytes exceeds MaxObjectSize of %v bytes", info.Size, store.MaxObjectSize))
assert.Equal("", id) assert.Nil(upload)
} }
func TestGetInfoNotFound(t *testing.T) { func TestGetInfoNotFound(t *testing.T) {
@ -155,7 +154,10 @@ func TestGetInfoNotFound(t *testing.T) {
Key: aws.String("uploadId.info"), Key: aws.String("uploadId.info"),
}).Return(nil, awserr.New("NoSuchKey", "The specified key does not exist.", nil)) }).Return(nil, awserr.New("NoSuchKey", "The specified key does not exist.", nil))
_, err := store.GetInfo("uploadId+multipartId") upload, err := store.GetUpload("uploadId+multipartId")
assert.Nil(err)
_, err = upload.GetInfo()
assert.Equal(handler.ErrNotFound, err) assert.Equal(handler.ErrNotFound, err)
} }
@ -209,7 +211,10 @@ func TestGetInfo(t *testing.T) {
}).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "Not found", nil)), }).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "Not found", nil)),
) )
info, err := store.GetInfo("uploadId+multipartId") upload, err := store.GetUpload("uploadId+multipartId")
assert.Nil(err)
info, err := upload.GetInfo()
assert.Nil(err) assert.Nil(err)
assert.Equal(int64(500), info.Size) assert.Equal(int64(500), info.Size)
assert.Equal(int64(400), info.Offset) assert.Equal(int64(400), info.Offset)
@ -251,7 +256,10 @@ func TestGetInfoWithIncompletePart(t *testing.T) {
}, nil), }, nil),
) )
info, err := store.GetInfo("uploadId+multipartId") upload, err := store.GetUpload("uploadId+multipartId")
assert.Nil(err)
info, err := upload.GetInfo()
assert.Nil(err) assert.Nil(err)
assert.Equal(int64(10), info.Offset) assert.Equal(int64(10), info.Offset)
assert.Equal("uploadId+multipartId", info.ID) assert.Equal("uploadId+multipartId", info.ID)
@ -280,7 +288,10 @@ func TestGetInfoFinished(t *testing.T) {
}).Return(nil, awserr.New("NoSuchUpload", "The specified upload does not exist.", nil)), }).Return(nil, awserr.New("NoSuchUpload", "The specified upload does not exist.", nil)),
) )
info, err := store.GetInfo("uploadId+multipartId") upload, err := store.GetUpload("uploadId+multipartId")
assert.Nil(err)
info, err := upload.GetInfo()
assert.Nil(err) assert.Nil(err)
assert.Equal(int64(500), info.Size) assert.Equal(int64(500), info.Size)
assert.Equal(int64(500), info.Offset) assert.Equal(int64(500), info.Offset)
@ -301,7 +312,10 @@ func TestGetReader(t *testing.T) {
Body: ioutil.NopCloser(bytes.NewReader([]byte(`hello world`))), Body: ioutil.NopCloser(bytes.NewReader([]byte(`hello world`))),
}, nil) }, nil)
content, err := store.GetReader("uploadId+multipartId") upload, err := store.GetUpload("uploadId+multipartId")
assert.Nil(err)
content, err := upload.GetReader()
assert.Nil(err) assert.Nil(err)
assert.Equal(ioutil.NopCloser(bytes.NewReader([]byte(`hello world`))), content) assert.Equal(ioutil.NopCloser(bytes.NewReader([]byte(`hello world`))), content)
} }
@ -327,7 +341,10 @@ func TestGetReaderNotFound(t *testing.T) {
}).Return(nil, awserr.New("NoSuchUpload", "The specified upload does not exist.", nil)), }).Return(nil, awserr.New("NoSuchUpload", "The specified upload does not exist.", nil)),
) )
content, err := store.GetReader("uploadId+multipartId") upload, err := store.GetUpload("uploadId+multipartId")
assert.Nil(err)
content, err := upload.GetReader()
assert.Nil(content) assert.Nil(content)
assert.Equal(handler.ErrNotFound, err) assert.Equal(handler.ErrNotFound, err)
} }
@ -355,7 +372,10 @@ func TestGetReaderNotFinished(t *testing.T) {
}, nil), }, nil),
) )
content, err := store.GetReader("uploadId+multipartId") upload, err := store.GetUpload("uploadId+multipartId")
assert.Nil(err)
content, err := upload.GetReader()
assert.Nil(content) assert.Nil(content)
assert.Equal("cannot stream non-finished upload", err.Error()) assert.Equal("cannot stream non-finished upload", err.Error())
} }
@ -395,7 +415,10 @@ func TestDeclareLength(t *testing.T) {
}), }),
) )
err := store.DeclareLength("uploadId+multipartId", 500) upload, err := store.GetUpload("uploadId+multipartId")
assert.Nil(err)
err = store.AsLengthDeclarableUpload(upload).DeclareLength(500)
assert.Nil(err) assert.Nil(err)
} }
@ -466,7 +489,10 @@ func TestFinishUpload(t *testing.T) {
}).Return(nil, nil), }).Return(nil, nil),
) )
err := store.FinishUpload("uploadId+multipartId") upload, err := store.GetUpload("uploadId+multipartId")
assert.Nil(err)
err = upload.FinishUpload()
assert.Nil(err) assert.Nil(err)
} }
@ -555,7 +581,10 @@ func TestWriteChunk(t *testing.T) {
})).Return(nil, nil), })).Return(nil, nil),
) )
bytesRead, err := store.WriteChunk("uploadId+multipartId", 300, bytes.NewReader([]byte("1234567890ABCD"))) upload, err := store.GetUpload("uploadId+multipartId")
assert.Nil(err)
bytesRead, err := upload.WriteChunk(300, bytes.NewReader([]byte("1234567890ABCD")))
assert.Nil(err) assert.Nil(err)
assert.Equal(int64(14), bytesRead) assert.Equal(int64(14), bytesRead)
} }
@ -634,7 +663,10 @@ func TestWriteChunkWithUnexpectedEOF(t *testing.T) {
writer.CloseWithError(io.ErrUnexpectedEOF) writer.CloseWithError(io.ErrUnexpectedEOF)
}() }()
bytesRead, err := store.WriteChunk("uploadId+multipartId", 300, reader) upload, err := store.GetUpload("uploadId+multipartId")
assert.Nil(err)
bytesRead, err := upload.WriteChunk(300, reader)
assert.Nil(err) assert.Nil(err)
assert.Equal(int64(14), bytesRead) assert.Equal(int64(14), bytesRead)
} }
@ -699,7 +731,10 @@ func TestWriteChunkWriteIncompletePartBecauseTooSmall(t *testing.T) {
})).Return(nil, nil), })).Return(nil, nil),
) )
bytesRead, err := store.WriteChunk("uploadId+multipartId", 300, bytes.NewReader([]byte("1234567890"))) upload, err := store.GetUpload("uploadId+multipartId")
assert.Nil(err)
bytesRead, err := upload.WriteChunk(300, bytes.NewReader([]byte("1234567890")))
assert.Nil(err) assert.Nil(err)
assert.Equal(int64(10), bytesRead) assert.Equal(int64(10), bytesRead)
} }
@ -769,7 +804,10 @@ func TestWriteChunkPrependsIncompletePart(t *testing.T) {
})).Return(nil, nil), })).Return(nil, nil),
) )
bytesRead, err := store.WriteChunk("uploadId+multipartId", 3, bytes.NewReader([]byte("45"))) upload, err := store.GetUpload("uploadId+multipartId")
assert.Nil(err)
bytesRead, err := upload.WriteChunk(3, bytes.NewReader([]byte("45")))
assert.Nil(err) assert.Nil(err)
assert.Equal(int64(2), bytesRead) assert.Equal(int64(2), bytesRead)
} }
@ -837,7 +875,10 @@ func TestWriteChunkPrependsIncompletePartAndWritesANewIncompletePart(t *testing.
})).Return(nil, nil), })).Return(nil, nil),
) )
bytesRead, err := store.WriteChunk("uploadId+multipartId", 3, bytes.NewReader([]byte("45"))) upload, err := store.GetUpload("uploadId+multipartId")
assert.Nil(err)
bytesRead, err := upload.WriteChunk(3, bytes.NewReader([]byte("45")))
assert.Nil(err) assert.Nil(err)
assert.Equal(int64(2), bytesRead) assert.Equal(int64(2), bytesRead)
} }
@ -905,10 +946,13 @@ func TestWriteChunkAllowTooSmallLast(t *testing.T) {
})).Return(nil, nil), })).Return(nil, nil),
) )
upload, err := store.GetUpload("uploadId+multipartId")
assert.Nil(err)
// 10 bytes are missing for the upload to be finished (offset at 490 for 500 // 10 bytes are missing for the upload to be finished (offset at 490 for 500
// bytes file) but the minimum chunk size is higher (20). The chunk is // bytes file) but the minimum chunk size is higher (20). The chunk is
// still uploaded since the last part may be smaller than the minimum. // still uploaded since the last part may be smaller than the minimum.
bytesRead, err := store.WriteChunk("uploadId+multipartId", 490, bytes.NewReader([]byte("1234567890"))) bytesRead, err := upload.WriteChunk(490, bytes.NewReader([]byte("1234567890")))
assert.Nil(err) assert.Nil(err)
assert.Equal(int64(10), bytesRead) assert.Equal(int64(10), bytesRead)
} }
@ -946,7 +990,10 @@ func TestTerminate(t *testing.T) {
}, },
}).Return(&s3.DeleteObjectsOutput{}, nil) }).Return(&s3.DeleteObjectsOutput{}, nil)
err := store.Terminate("uploadId+multipartId") upload, err := store.GetUpload("uploadId+multipartId")
assert.Nil(err)
err = store.AsTerminatableUpload(upload).Terminate()
assert.Nil(err) assert.Nil(err)
} }
@ -992,7 +1039,10 @@ func TestTerminateWithErrors(t *testing.T) {
}, },
}, nil) }, nil)
err := store.Terminate("uploadId+multipartId") upload, err := store.GetUpload("uploadId+multipartId")
assert.Nil(err)
err = store.AsTerminatableUpload(upload).Terminate()
assert.Equal("Multiple errors occurred:\n\tAWS S3 Error (hello) for object uploadId: it's me.\n", err.Error()) assert.Equal("Multiple errors occurred:\n\tAWS S3 Error (hello) for object uploadId: it's me.\n", err.Error())
} }