diff --git a/pkg/s3store/s3store.go b/pkg/s3store/s3store.go index b71289c..44355b2 100644 --- a/pkg/s3store/s3store.go +++ b/pkg/s3store/s3store.go @@ -162,16 +162,19 @@ func New(bucket string, service S3API) S3Store { func (store S3Store) UseIn(composer *handler.StoreComposer) { composer.UseCore(store) composer.UseTerminater(store) - composer.UseFinisher(store) - composer.UseGetReader(store) composer.UseConcater(store) composer.UseLengthDeferrer(store) } -func (store S3Store) NewUpload(info handler.FileInfo) (id string, err error) { +type s3Upload struct { + id string + store *S3Store +} + +func (store S3Store) NewUpload(info handler.FileInfo) (handler.Upload, error) { // an upload larger than MaxObjectSize must throw an error if info.Size > store.MaxObjectSize { - return "", fmt.Errorf("s3store: upload size of %v bytes exceeds MaxObjectSize of %v bytes", info.Size, store.MaxObjectSize) + return nil, fmt.Errorf("s3store: upload size of %v bytes exceeds MaxObjectSize of %v bytes", info.Size, store.MaxObjectSize) } var uploadId string @@ -198,10 +201,10 @@ func (store S3Store) NewUpload(info handler.FileInfo) (id string, err error) { Metadata: metadata, }) if err != nil { - return "", fmt.Errorf("s3store: unable to create multipart upload:\n%s", err) + return nil, fmt.Errorf("s3store: unable to create multipart upload:\n%s", err) } - id = uploadId + "+" + *res.UploadId + id := uploadId + "+" + *res.UploadId info.ID = id info.Storage = map[string]string{ @@ -212,10 +215,22 @@ func (store S3Store) NewUpload(info handler.FileInfo) (id string, err error) { err = store.writeInfo(uploadId, info) if err != nil { - return "", fmt.Errorf("s3store: unable to create info file:\n%s", err) + return nil, fmt.Errorf("s3store: unable to create info file:\n%s", err) } - return id, nil + return &s3Upload{id, &store}, nil +} + +func (store S3Store) GetUpload(id string) (handler.Upload, error) { + return &s3Upload{id, &store}, nil +} + +func (store S3Store) AsTerminatableUpload(upload handler.Upload) handler.TerminatableUpload { + return upload.(*s3Upload) +} + +func (store S3Store) AsLengthDeclarableUpload(upload handler.Upload) handler.LengthDeclarableUpload { + return upload.(*s3Upload) } func (store S3Store) writeInfo(uploadId string, info handler.FileInfo) error { @@ -235,11 +250,14 @@ func (store S3Store) writeInfo(uploadId string, info handler.FileInfo) error { return err } -func (store S3Store) WriteChunk(id string, offset int64, src io.Reader) (int64, error) { +func (upload s3Upload) WriteChunk(offset int64, src io.Reader) (int64, error) { + id := upload.id + store := upload.store + uploadId, multipartId := splitIds(id) // Get the total size of the current upload - info, err := store.GetInfo(id) + info, err := upload.GetInfo() if err != nil { return 0, err } @@ -336,7 +354,9 @@ func (store S3Store) WriteChunk(id string, offset int64, src io.Reader) (int64, } } -func (store S3Store) GetInfo(id string) (info handler.FileInfo, err error) { +func (upload s3Upload) GetInfo() (info handler.FileInfo, err error) { + id := upload.id + store := upload.store uploadId, _ := splitIds(id) // Get file info stored in separate object @@ -391,7 +411,9 @@ func (store S3Store) GetInfo(id string) (info handler.FileInfo, err error) { return } -func (store S3Store) GetReader(id string) (io.Reader, error) { +func (upload s3Upload) GetReader() (io.Reader, error) { + id := upload.id + store := upload.store uploadId, multipartId := splitIds(id) // Attempt to get upload content @@ -432,7 +454,9 @@ func (store S3Store) GetReader(id string) (io.Reader, error) { return nil, err } -func (store S3Store) Terminate(id string) error { +func (upload s3Upload) Terminate() error { + id := upload.id + store := upload.store uploadId, multipartId := splitIds(id) var wg sync.WaitGroup wg.Add(2) @@ -495,7 +519,9 @@ func (store S3Store) Terminate(id string) error { return nil } -func (store S3Store) FinishUpload(id string) error { +func (upload s3Upload) FinishUpload() error { + id := upload.id + store := upload.store uploadId, multipartId := splitIds(id) // Get uploaded parts @@ -564,12 +590,18 @@ func (store S3Store) ConcatUploads(dest string, partialUploads []string) error { return newMultiError(errs) } - return store.FinishUpload(dest) + upload, err := store.GetUpload(dest) + if err != nil { + return err + } + return upload.FinishUpload() } -func (store S3Store) DeclareLength(id string, length int64) error { +func (upload s3Upload) DeclareLength(length int64) error { + id := upload.id + store := upload.store uploadId, _ := splitIds(id) - info, err := store.GetInfo(id) + info, err := upload.GetInfo() if err != nil { return err } diff --git a/pkg/s3store/s3store_test.go b/pkg/s3store/s3store_test.go index 709484b..59b7b68 100644 --- a/pkg/s3store/s3store_test.go +++ b/pkg/s3store/s3store_test.go @@ -20,10 +20,9 @@ import ( // Test interface implementations var _ handler.DataStore = S3Store{} -var _ handler.GetReaderDataStore = S3Store{} var _ handler.TerminaterDataStore = S3Store{} -var _ handler.FinisherDataStore = S3Store{} var _ handler.ConcaterDataStore = S3Store{} +var _ handler.LengthDeferrerDataStore = S3Store{} func TestNewUpload(t *testing.T) { mockCtrl := gomock.NewController(t) @@ -67,9 +66,9 @@ func TestNewUpload(t *testing.T) { }, } - id, err := store.NewUpload(info) + upload, err := store.NewUpload(info) assert.Nil(err) - assert.Equal("uploadId+multipartId", id) + assert.NotNil(upload) } func TestNewUploadWithObjectPrefix(t *testing.T) { @@ -115,9 +114,9 @@ func TestNewUploadWithObjectPrefix(t *testing.T) { }, } - id, err := store.NewUpload(info) + upload, err := store.NewUpload(info) assert.Nil(err) - assert.Equal("uploadId+multipartId", id) + assert.NotNil(upload) } func TestNewUploadLargerMaxObjectSize(t *testing.T) { @@ -136,10 +135,10 @@ func TestNewUploadLargerMaxObjectSize(t *testing.T) { Size: store.MaxObjectSize + 1, } - id, err := store.NewUpload(info) + upload, err := store.NewUpload(info) assert.NotNil(err) assert.EqualError(err, fmt.Sprintf("s3store: upload size of %v bytes exceeds MaxObjectSize of %v bytes", info.Size, store.MaxObjectSize)) - assert.Equal("", id) + assert.Nil(upload) } func TestGetInfoNotFound(t *testing.T) { @@ -155,7 +154,10 @@ func TestGetInfoNotFound(t *testing.T) { Key: aws.String("uploadId.info"), }).Return(nil, awserr.New("NoSuchKey", "The specified key does not exist.", nil)) - _, err := store.GetInfo("uploadId+multipartId") + upload, err := store.GetUpload("uploadId+multipartId") + assert.Nil(err) + + _, err = upload.GetInfo() assert.Equal(handler.ErrNotFound, err) } @@ -209,7 +211,10 @@ func TestGetInfo(t *testing.T) { }).Return(&s3.GetObjectOutput{}, awserr.New("NoSuchKey", "Not found", nil)), ) - info, err := store.GetInfo("uploadId+multipartId") + upload, err := store.GetUpload("uploadId+multipartId") + assert.Nil(err) + + info, err := upload.GetInfo() assert.Nil(err) assert.Equal(int64(500), info.Size) assert.Equal(int64(400), info.Offset) @@ -251,7 +256,10 @@ func TestGetInfoWithIncompletePart(t *testing.T) { }, nil), ) - info, err := store.GetInfo("uploadId+multipartId") + upload, err := store.GetUpload("uploadId+multipartId") + assert.Nil(err) + + info, err := upload.GetInfo() assert.Nil(err) assert.Equal(int64(10), info.Offset) assert.Equal("uploadId+multipartId", info.ID) @@ -280,7 +288,10 @@ func TestGetInfoFinished(t *testing.T) { }).Return(nil, awserr.New("NoSuchUpload", "The specified upload does not exist.", nil)), ) - info, err := store.GetInfo("uploadId+multipartId") + upload, err := store.GetUpload("uploadId+multipartId") + assert.Nil(err) + + info, err := upload.GetInfo() assert.Nil(err) assert.Equal(int64(500), info.Size) assert.Equal(int64(500), info.Offset) @@ -301,7 +312,10 @@ func TestGetReader(t *testing.T) { Body: ioutil.NopCloser(bytes.NewReader([]byte(`hello world`))), }, nil) - content, err := store.GetReader("uploadId+multipartId") + upload, err := store.GetUpload("uploadId+multipartId") + assert.Nil(err) + + content, err := upload.GetReader() assert.Nil(err) assert.Equal(ioutil.NopCloser(bytes.NewReader([]byte(`hello world`))), content) } @@ -327,7 +341,10 @@ func TestGetReaderNotFound(t *testing.T) { }).Return(nil, awserr.New("NoSuchUpload", "The specified upload does not exist.", nil)), ) - content, err := store.GetReader("uploadId+multipartId") + upload, err := store.GetUpload("uploadId+multipartId") + assert.Nil(err) + + content, err := upload.GetReader() assert.Nil(content) assert.Equal(handler.ErrNotFound, err) } @@ -355,7 +372,10 @@ func TestGetReaderNotFinished(t *testing.T) { }, nil), ) - content, err := store.GetReader("uploadId+multipartId") + upload, err := store.GetUpload("uploadId+multipartId") + assert.Nil(err) + + content, err := upload.GetReader() assert.Nil(content) assert.Equal("cannot stream non-finished upload", err.Error()) } @@ -395,7 +415,10 @@ func TestDeclareLength(t *testing.T) { }), ) - err := store.DeclareLength("uploadId+multipartId", 500) + upload, err := store.GetUpload("uploadId+multipartId") + assert.Nil(err) + + err = store.AsLengthDeclarableUpload(upload).DeclareLength(500) assert.Nil(err) } @@ -466,7 +489,10 @@ func TestFinishUpload(t *testing.T) { }).Return(nil, nil), ) - err := store.FinishUpload("uploadId+multipartId") + upload, err := store.GetUpload("uploadId+multipartId") + assert.Nil(err) + + err = upload.FinishUpload() assert.Nil(err) } @@ -555,7 +581,10 @@ func TestWriteChunk(t *testing.T) { })).Return(nil, nil), ) - bytesRead, err := store.WriteChunk("uploadId+multipartId", 300, bytes.NewReader([]byte("1234567890ABCD"))) + upload, err := store.GetUpload("uploadId+multipartId") + assert.Nil(err) + + bytesRead, err := upload.WriteChunk(300, bytes.NewReader([]byte("1234567890ABCD"))) assert.Nil(err) assert.Equal(int64(14), bytesRead) } @@ -634,7 +663,10 @@ func TestWriteChunkWithUnexpectedEOF(t *testing.T) { writer.CloseWithError(io.ErrUnexpectedEOF) }() - bytesRead, err := store.WriteChunk("uploadId+multipartId", 300, reader) + upload, err := store.GetUpload("uploadId+multipartId") + assert.Nil(err) + + bytesRead, err := upload.WriteChunk(300, reader) assert.Nil(err) assert.Equal(int64(14), bytesRead) } @@ -699,7 +731,10 @@ func TestWriteChunkWriteIncompletePartBecauseTooSmall(t *testing.T) { })).Return(nil, nil), ) - bytesRead, err := store.WriteChunk("uploadId+multipartId", 300, bytes.NewReader([]byte("1234567890"))) + upload, err := store.GetUpload("uploadId+multipartId") + assert.Nil(err) + + bytesRead, err := upload.WriteChunk(300, bytes.NewReader([]byte("1234567890"))) assert.Nil(err) assert.Equal(int64(10), bytesRead) } @@ -769,7 +804,10 @@ func TestWriteChunkPrependsIncompletePart(t *testing.T) { })).Return(nil, nil), ) - bytesRead, err := store.WriteChunk("uploadId+multipartId", 3, bytes.NewReader([]byte("45"))) + upload, err := store.GetUpload("uploadId+multipartId") + assert.Nil(err) + + bytesRead, err := upload.WriteChunk(3, bytes.NewReader([]byte("45"))) assert.Nil(err) assert.Equal(int64(2), bytesRead) } @@ -837,7 +875,10 @@ func TestWriteChunkPrependsIncompletePartAndWritesANewIncompletePart(t *testing. })).Return(nil, nil), ) - bytesRead, err := store.WriteChunk("uploadId+multipartId", 3, bytes.NewReader([]byte("45"))) + upload, err := store.GetUpload("uploadId+multipartId") + assert.Nil(err) + + bytesRead, err := upload.WriteChunk(3, bytes.NewReader([]byte("45"))) assert.Nil(err) assert.Equal(int64(2), bytesRead) } @@ -905,10 +946,13 @@ func TestWriteChunkAllowTooSmallLast(t *testing.T) { })).Return(nil, nil), ) + upload, err := store.GetUpload("uploadId+multipartId") + assert.Nil(err) + // 10 bytes are missing for the upload to be finished (offset at 490 for 500 // bytes file) but the minimum chunk size is higher (20). The chunk is // still uploaded since the last part may be smaller than the minimum. - bytesRead, err := store.WriteChunk("uploadId+multipartId", 490, bytes.NewReader([]byte("1234567890"))) + bytesRead, err := upload.WriteChunk(490, bytes.NewReader([]byte("1234567890"))) assert.Nil(err) assert.Equal(int64(10), bytesRead) } @@ -946,7 +990,10 @@ func TestTerminate(t *testing.T) { }, }).Return(&s3.DeleteObjectsOutput{}, nil) - err := store.Terminate("uploadId+multipartId") + upload, err := store.GetUpload("uploadId+multipartId") + assert.Nil(err) + + err = store.AsTerminatableUpload(upload).Terminate() assert.Nil(err) } @@ -992,7 +1039,10 @@ func TestTerminateWithErrors(t *testing.T) { }, }, nil) - err := store.Terminate("uploadId+multipartId") + upload, err := store.GetUpload("uploadId+multipartId") + assert.Nil(err) + + err = store.AsTerminatableUpload(upload).Terminate() assert.Equal("Multiple errors occurred:\n\tAWS S3 Error (hello) for object uploadId: it's me.\n", err.Error()) }