diff --git a/cmd/tusd/cli/composer.go b/cmd/tusd/cli/composer.go index aee2c68..6709ddc 100644 --- a/cmd/tusd/cli/composer.go +++ b/cmd/tusd/cli/composer.go @@ -5,6 +5,7 @@ import ( "path/filepath" "github.com/tus/tusd/pkg/filestore" + "github.com/tus/tusd/pkg/gcsstore" "github.com/tus/tusd/pkg/handler" "github.com/tus/tusd/pkg/memorylocker" "github.com/tus/tusd/pkg/s3store" @@ -47,7 +48,7 @@ func CreateComposer() { stderr.Fatalf("No service account file provided for Google Cloud Storage using the GCS_SERVICE_ACCOUNT_FILE environment variable.\n") } - /*service, err := gcsstore.NewGCSService(gcsSAF) + service, err := gcsstore.NewGCSService(gcsSAF) if err != nil { stderr.Fatalf("Unable to create Google Cloud Storage service: %s\n", err) } @@ -59,7 +60,7 @@ func CreateComposer() { store.UseIn(Composer) locker := memorylocker.New() - locker.UseIn(Composer)*/ + locker.UseIn(Composer) } else { dir, err := filepath.Abs(Flags.UploadDir) if err != nil { diff --git a/pkg/gcsstore/gcsstore.go b/pkg/gcsstore/gcsstore.go index a4fe264..301b8ae 100644 --- a/pkg/gcsstore/gcsstore.go +++ b/pkg/gcsstore/gcsstore.go @@ -54,11 +54,9 @@ func New(bucket string, service GCSAPI) GCSStore { func (store GCSStore) UseIn(composer *handler.StoreComposer) { composer.UseCore(store) composer.UseTerminater(store) - composer.UseFinisher(store) - composer.UseGetReader(store) } -func (store GCSStore) NewUpload(info handler.FileInfo) (id string, err error) { +func (store GCSStore) NewUpload(info handler.FileInfo) (handler.Upload, error) { if info.ID == "" { info.ID = uid.Uid() } @@ -70,15 +68,31 @@ func (store GCSStore) NewUpload(info handler.FileInfo) (id string, err error) { } ctx := context.Background() - err = store.writeInfo(ctx, store.keyWithPrefix(info.ID), info) + err := store.writeInfo(ctx, store.keyWithPrefix(info.ID), info) if err != nil { - return info.ID, err + return &gcsUpload{info.ID, &store}, err } - return info.ID, nil + return &gcsUpload{info.ID, &store}, nil } -func (store GCSStore) WriteChunk(id string, offset int64, src io.Reader) (int64, error) { +type gcsUpload struct { + id string + store *GCSStore +} + +func (store GCSStore) GetUpload(id string) (handler.Upload, error) { + return &gcsUpload{id, &store}, nil +} + +func (store GCSStore) AsTerminatableUpload(upload handler.Upload) handler.TerminatableUpload { + return upload.(*gcsUpload) +} + +func (upload gcsUpload) WriteChunk(offset int64, src io.Reader) (int64, error) { + id := upload.id + store := upload.store + prefix := fmt.Sprintf("%s_", store.keyWithPrefix(id)) filterParams := GCSFilterParams{ Bucket: store.Bucket, @@ -121,7 +135,10 @@ func (store GCSStore) WriteChunk(id string, offset int64, src io.Reader) (int64, const CONCURRENT_SIZE_REQUESTS = 32 -func (store GCSStore) GetInfo(id string) (handler.FileInfo, error) { +func (upload gcsUpload) GetInfo() (handler.FileInfo, error) { + id := upload.id + store := upload.store + info := handler.FileInfo{} i := fmt.Sprintf("%s.info", store.keyWithPrefix(id)) @@ -241,7 +258,10 @@ func (store GCSStore) writeInfo(ctx context.Context, id string, info handler.Fil return nil } -func (store GCSStore) FinishUpload(id string) error { +func (upload gcsUpload) FinishUpload() error { + id := upload.id + store := upload.store + prefix := fmt.Sprintf("%s_", store.keyWithPrefix(id)) filterParams := GCSFilterParams{ Bucket: store.Bucket, @@ -270,7 +290,7 @@ func (store GCSStore) FinishUpload(id string) error { return err } - info, err := store.GetInfo(id) + info, err := upload.GetInfo() if err != nil { return err } @@ -288,7 +308,10 @@ func (store GCSStore) FinishUpload(id string) error { return nil } -func (store GCSStore) Terminate(id string) error { +func (upload gcsUpload) Terminate() error { + id := upload.id + store := upload.store + filterParams := GCSFilterParams{ Bucket: store.Bucket, Prefix: store.keyWithPrefix(id), @@ -303,7 +326,10 @@ func (store GCSStore) Terminate(id string) error { return nil } -func (store GCSStore) GetReader(id string) (io.Reader, error) { +func (upload gcsUpload) GetReader() (io.Reader, error) { + id := upload.id + store := upload.store + params := GCSObjectParams{ Bucket: store.Bucket, ID: store.keyWithPrefix(id), diff --git a/pkg/gcsstore/gcsstore_test.go b/pkg/gcsstore/gcsstore_test.go index 8b0cc5e..25e9592 100644 --- a/pkg/gcsstore/gcsstore_test.go +++ b/pkg/gcsstore/gcsstore_test.go @@ -64,9 +64,9 @@ func TestNewUpload(t *testing.T) { ctx := context.Background() service.EXPECT().WriteObject(ctx, params, r).Return(int64(r.Len()), nil) - id, err := store.NewUpload(mockTusdInfo) + upload, err := store.NewUpload(mockTusdInfo) assert.Nil(err) - assert.Equal(id, mockID) + assert.NotNil(upload) } func TestNewUploadWithPrefix(t *testing.T) { @@ -99,9 +99,9 @@ func TestNewUploadWithPrefix(t *testing.T) { ctx := context.Background() service.EXPECT().WriteObject(ctx, params, r).Return(int64(r.Len()), nil) - id, err := store.NewUpload(mockTusdInfo) + upload, err := store.NewUpload(mockTusdInfo) assert.Nil(err) - assert.Equal(id, mockID) + assert.NotNil(upload) } type MockGetInfoReader struct{} @@ -185,7 +185,10 @@ func TestGetInfo(t *testing.T) { service.EXPECT().WriteObject(ctx, params, infoR).Return(int64(len(offsetInfoData)), nil).After(lastGetObjectSize) - info, err := store.GetInfo(mockID) + upload, err := store.GetUpload(mockID) + assert.Nil(err) + + info, err := upload.GetInfo() assert.Nil(err) assert.Equal(mockTusdInfo, info) @@ -211,7 +214,10 @@ func TestGetInfoNotFound(t *testing.T) { service.EXPECT().ReadObject(ctx, params).Return(nil, storage.ErrObjectNotExist), ) - _, err := store.GetInfo(mockID) + upload, err := store.GetUpload(mockID) + assert.Nil(err) + + _, err = upload.GetInfo() assert.Equal(handler.ErrNotFound, err) } @@ -257,7 +263,11 @@ func TestGetReader(t *testing.T) { ctx := context.Background() service.EXPECT().ReadObject(ctx, params).Return(r, nil) - reader, err := store.GetReader(mockID) + + upload, err := store.GetUpload(mockID) + assert.Nil(err) + + reader, err := upload.GetReader() assert.Nil(err) buf := make([]byte, len(mockReaderData)) @@ -285,7 +295,10 @@ func TestTerminate(t *testing.T) { ctx := context.Background() service.EXPECT().DeleteObjectsWithFilter(ctx, filterParams).Return(nil) - err := store.Terminate(mockID) + upload, err := store.GetUpload(mockID) + assert.Nil(err) + + err = store.AsTerminatableUpload(upload).Terminate() assert.Nil(err) } @@ -371,7 +384,10 @@ func TestFinishUpload(t *testing.T) { writeObject := service.EXPECT().WriteObject(ctx, infoParams, infoR).Return(int64(len(offsetInfoData)), nil).After(lastGetObjectSize) service.EXPECT().SetObjectMetadata(ctx, objectParams, metadata).Return(nil).After(writeObject) - err = store.FinishUpload(mockID) + upload, err := store.GetUpload(mockID) + assert.Nil(err) + + err = upload.FinishUpload() assert.Nil(err) // Cancel the context to avoid getting an error from `go vet` @@ -446,7 +462,11 @@ func TestWriteChunk(t *testing.T) { reader := bytes.NewReader([]byte(mockReaderData)) var offset int64 offset = mockSize / 3 - _, err := store.WriteChunk(mockID, offset, reader) + + upload, err := store.GetUpload(mockID) + assert.Nil(err) + + _, err = upload.WriteChunk(offset, reader) assert.Nil(err) }