gcsstore: Adjust to new DataStore interfaces
This commit is contained in:
parent
e591c9ea69
commit
d01d878e29
|
@ -5,6 +5,7 @@ import (
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
"github.com/tus/tusd/pkg/filestore"
|
"github.com/tus/tusd/pkg/filestore"
|
||||||
|
"github.com/tus/tusd/pkg/gcsstore"
|
||||||
"github.com/tus/tusd/pkg/handler"
|
"github.com/tus/tusd/pkg/handler"
|
||||||
"github.com/tus/tusd/pkg/memorylocker"
|
"github.com/tus/tusd/pkg/memorylocker"
|
||||||
"github.com/tus/tusd/pkg/s3store"
|
"github.com/tus/tusd/pkg/s3store"
|
||||||
|
@ -47,7 +48,7 @@ func CreateComposer() {
|
||||||
stderr.Fatalf("No service account file provided for Google Cloud Storage using the GCS_SERVICE_ACCOUNT_FILE environment variable.\n")
|
stderr.Fatalf("No service account file provided for Google Cloud Storage using the GCS_SERVICE_ACCOUNT_FILE environment variable.\n")
|
||||||
}
|
}
|
||||||
|
|
||||||
/*service, err := gcsstore.NewGCSService(gcsSAF)
|
service, err := gcsstore.NewGCSService(gcsSAF)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
stderr.Fatalf("Unable to create Google Cloud Storage service: %s\n", err)
|
stderr.Fatalf("Unable to create Google Cloud Storage service: %s\n", err)
|
||||||
}
|
}
|
||||||
|
@ -59,7 +60,7 @@ func CreateComposer() {
|
||||||
store.UseIn(Composer)
|
store.UseIn(Composer)
|
||||||
|
|
||||||
locker := memorylocker.New()
|
locker := memorylocker.New()
|
||||||
locker.UseIn(Composer)*/
|
locker.UseIn(Composer)
|
||||||
} else {
|
} else {
|
||||||
dir, err := filepath.Abs(Flags.UploadDir)
|
dir, err := filepath.Abs(Flags.UploadDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -54,11 +54,9 @@ func New(bucket string, service GCSAPI) GCSStore {
|
||||||
func (store GCSStore) UseIn(composer *handler.StoreComposer) {
|
func (store GCSStore) UseIn(composer *handler.StoreComposer) {
|
||||||
composer.UseCore(store)
|
composer.UseCore(store)
|
||||||
composer.UseTerminater(store)
|
composer.UseTerminater(store)
|
||||||
composer.UseFinisher(store)
|
|
||||||
composer.UseGetReader(store)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (store GCSStore) NewUpload(info handler.FileInfo) (id string, err error) {
|
func (store GCSStore) NewUpload(info handler.FileInfo) (handler.Upload, error) {
|
||||||
if info.ID == "" {
|
if info.ID == "" {
|
||||||
info.ID = uid.Uid()
|
info.ID = uid.Uid()
|
||||||
}
|
}
|
||||||
|
@ -70,15 +68,31 @@ func (store GCSStore) NewUpload(info handler.FileInfo) (id string, err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
err = store.writeInfo(ctx, store.keyWithPrefix(info.ID), info)
|
err := store.writeInfo(ctx, store.keyWithPrefix(info.ID), info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return info.ID, err
|
return &gcsUpload{info.ID, &store}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return info.ID, nil
|
return &gcsUpload{info.ID, &store}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (store GCSStore) WriteChunk(id string, offset int64, src io.Reader) (int64, error) {
|
type gcsUpload struct {
|
||||||
|
id string
|
||||||
|
store *GCSStore
|
||||||
|
}
|
||||||
|
|
||||||
|
func (store GCSStore) GetUpload(id string) (handler.Upload, error) {
|
||||||
|
return &gcsUpload{id, &store}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (store GCSStore) AsTerminatableUpload(upload handler.Upload) handler.TerminatableUpload {
|
||||||
|
return upload.(*gcsUpload)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (upload gcsUpload) WriteChunk(offset int64, src io.Reader) (int64, error) {
|
||||||
|
id := upload.id
|
||||||
|
store := upload.store
|
||||||
|
|
||||||
prefix := fmt.Sprintf("%s_", store.keyWithPrefix(id))
|
prefix := fmt.Sprintf("%s_", store.keyWithPrefix(id))
|
||||||
filterParams := GCSFilterParams{
|
filterParams := GCSFilterParams{
|
||||||
Bucket: store.Bucket,
|
Bucket: store.Bucket,
|
||||||
|
@ -121,7 +135,10 @@ func (store GCSStore) WriteChunk(id string, offset int64, src io.Reader) (int64,
|
||||||
|
|
||||||
const CONCURRENT_SIZE_REQUESTS = 32
|
const CONCURRENT_SIZE_REQUESTS = 32
|
||||||
|
|
||||||
func (store GCSStore) GetInfo(id string) (handler.FileInfo, error) {
|
func (upload gcsUpload) GetInfo() (handler.FileInfo, error) {
|
||||||
|
id := upload.id
|
||||||
|
store := upload.store
|
||||||
|
|
||||||
info := handler.FileInfo{}
|
info := handler.FileInfo{}
|
||||||
i := fmt.Sprintf("%s.info", store.keyWithPrefix(id))
|
i := fmt.Sprintf("%s.info", store.keyWithPrefix(id))
|
||||||
|
|
||||||
|
@ -241,7 +258,10 @@ func (store GCSStore) writeInfo(ctx context.Context, id string, info handler.Fil
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (store GCSStore) FinishUpload(id string) error {
|
func (upload gcsUpload) FinishUpload() error {
|
||||||
|
id := upload.id
|
||||||
|
store := upload.store
|
||||||
|
|
||||||
prefix := fmt.Sprintf("%s_", store.keyWithPrefix(id))
|
prefix := fmt.Sprintf("%s_", store.keyWithPrefix(id))
|
||||||
filterParams := GCSFilterParams{
|
filterParams := GCSFilterParams{
|
||||||
Bucket: store.Bucket,
|
Bucket: store.Bucket,
|
||||||
|
@ -270,7 +290,7 @@ func (store GCSStore) FinishUpload(id string) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
info, err := store.GetInfo(id)
|
info, err := upload.GetInfo()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -288,7 +308,10 @@ func (store GCSStore) FinishUpload(id string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (store GCSStore) Terminate(id string) error {
|
func (upload gcsUpload) Terminate() error {
|
||||||
|
id := upload.id
|
||||||
|
store := upload.store
|
||||||
|
|
||||||
filterParams := GCSFilterParams{
|
filterParams := GCSFilterParams{
|
||||||
Bucket: store.Bucket,
|
Bucket: store.Bucket,
|
||||||
Prefix: store.keyWithPrefix(id),
|
Prefix: store.keyWithPrefix(id),
|
||||||
|
@ -303,7 +326,10 @@ func (store GCSStore) Terminate(id string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (store GCSStore) GetReader(id string) (io.Reader, error) {
|
func (upload gcsUpload) GetReader() (io.Reader, error) {
|
||||||
|
id := upload.id
|
||||||
|
store := upload.store
|
||||||
|
|
||||||
params := GCSObjectParams{
|
params := GCSObjectParams{
|
||||||
Bucket: store.Bucket,
|
Bucket: store.Bucket,
|
||||||
ID: store.keyWithPrefix(id),
|
ID: store.keyWithPrefix(id),
|
||||||
|
|
|
@ -64,9 +64,9 @@ func TestNewUpload(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
service.EXPECT().WriteObject(ctx, params, r).Return(int64(r.Len()), nil)
|
service.EXPECT().WriteObject(ctx, params, r).Return(int64(r.Len()), nil)
|
||||||
|
|
||||||
id, err := store.NewUpload(mockTusdInfo)
|
upload, err := store.NewUpload(mockTusdInfo)
|
||||||
assert.Nil(err)
|
assert.Nil(err)
|
||||||
assert.Equal(id, mockID)
|
assert.NotNil(upload)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNewUploadWithPrefix(t *testing.T) {
|
func TestNewUploadWithPrefix(t *testing.T) {
|
||||||
|
@ -99,9 +99,9 @@ func TestNewUploadWithPrefix(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
service.EXPECT().WriteObject(ctx, params, r).Return(int64(r.Len()), nil)
|
service.EXPECT().WriteObject(ctx, params, r).Return(int64(r.Len()), nil)
|
||||||
|
|
||||||
id, err := store.NewUpload(mockTusdInfo)
|
upload, err := store.NewUpload(mockTusdInfo)
|
||||||
assert.Nil(err)
|
assert.Nil(err)
|
||||||
assert.Equal(id, mockID)
|
assert.NotNil(upload)
|
||||||
}
|
}
|
||||||
|
|
||||||
type MockGetInfoReader struct{}
|
type MockGetInfoReader struct{}
|
||||||
|
@ -185,7 +185,10 @@ func TestGetInfo(t *testing.T) {
|
||||||
|
|
||||||
service.EXPECT().WriteObject(ctx, params, infoR).Return(int64(len(offsetInfoData)), nil).After(lastGetObjectSize)
|
service.EXPECT().WriteObject(ctx, params, infoR).Return(int64(len(offsetInfoData)), nil).After(lastGetObjectSize)
|
||||||
|
|
||||||
info, err := store.GetInfo(mockID)
|
upload, err := store.GetUpload(mockID)
|
||||||
|
assert.Nil(err)
|
||||||
|
|
||||||
|
info, err := upload.GetInfo()
|
||||||
assert.Nil(err)
|
assert.Nil(err)
|
||||||
assert.Equal(mockTusdInfo, info)
|
assert.Equal(mockTusdInfo, info)
|
||||||
|
|
||||||
|
@ -211,7 +214,10 @@ func TestGetInfoNotFound(t *testing.T) {
|
||||||
service.EXPECT().ReadObject(ctx, params).Return(nil, storage.ErrObjectNotExist),
|
service.EXPECT().ReadObject(ctx, params).Return(nil, storage.ErrObjectNotExist),
|
||||||
)
|
)
|
||||||
|
|
||||||
_, err := store.GetInfo(mockID)
|
upload, err := store.GetUpload(mockID)
|
||||||
|
assert.Nil(err)
|
||||||
|
|
||||||
|
_, err = upload.GetInfo()
|
||||||
assert.Equal(handler.ErrNotFound, err)
|
assert.Equal(handler.ErrNotFound, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -257,7 +263,11 @@ func TestGetReader(t *testing.T) {
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
service.EXPECT().ReadObject(ctx, params).Return(r, nil)
|
service.EXPECT().ReadObject(ctx, params).Return(r, nil)
|
||||||
reader, err := store.GetReader(mockID)
|
|
||||||
|
upload, err := store.GetUpload(mockID)
|
||||||
|
assert.Nil(err)
|
||||||
|
|
||||||
|
reader, err := upload.GetReader()
|
||||||
assert.Nil(err)
|
assert.Nil(err)
|
||||||
|
|
||||||
buf := make([]byte, len(mockReaderData))
|
buf := make([]byte, len(mockReaderData))
|
||||||
|
@ -285,7 +295,10 @@ func TestTerminate(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
service.EXPECT().DeleteObjectsWithFilter(ctx, filterParams).Return(nil)
|
service.EXPECT().DeleteObjectsWithFilter(ctx, filterParams).Return(nil)
|
||||||
|
|
||||||
err := store.Terminate(mockID)
|
upload, err := store.GetUpload(mockID)
|
||||||
|
assert.Nil(err)
|
||||||
|
|
||||||
|
err = store.AsTerminatableUpload(upload).Terminate()
|
||||||
assert.Nil(err)
|
assert.Nil(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -371,7 +384,10 @@ func TestFinishUpload(t *testing.T) {
|
||||||
writeObject := service.EXPECT().WriteObject(ctx, infoParams, infoR).Return(int64(len(offsetInfoData)), nil).After(lastGetObjectSize)
|
writeObject := service.EXPECT().WriteObject(ctx, infoParams, infoR).Return(int64(len(offsetInfoData)), nil).After(lastGetObjectSize)
|
||||||
service.EXPECT().SetObjectMetadata(ctx, objectParams, metadata).Return(nil).After(writeObject)
|
service.EXPECT().SetObjectMetadata(ctx, objectParams, metadata).Return(nil).After(writeObject)
|
||||||
|
|
||||||
err = store.FinishUpload(mockID)
|
upload, err := store.GetUpload(mockID)
|
||||||
|
assert.Nil(err)
|
||||||
|
|
||||||
|
err = upload.FinishUpload()
|
||||||
assert.Nil(err)
|
assert.Nil(err)
|
||||||
|
|
||||||
// Cancel the context to avoid getting an error from `go vet`
|
// Cancel the context to avoid getting an error from `go vet`
|
||||||
|
@ -446,7 +462,11 @@ func TestWriteChunk(t *testing.T) {
|
||||||
reader := bytes.NewReader([]byte(mockReaderData))
|
reader := bytes.NewReader([]byte(mockReaderData))
|
||||||
var offset int64
|
var offset int64
|
||||||
offset = mockSize / 3
|
offset = mockSize / 3
|
||||||
_, err := store.WriteChunk(mockID, offset, reader)
|
|
||||||
|
upload, err := store.GetUpload(mockID)
|
||||||
|
assert.Nil(err)
|
||||||
|
|
||||||
|
_, err = upload.WriteChunk(offset, reader)
|
||||||
assert.Nil(err)
|
assert.Nil(err)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue