From a1273df6cd86076268d2d083005d6bd2ab899f76 Mon Sep 17 00:00:00 2001 From: Tom Berger Date: Wed, 25 Jul 2018 10:25:59 -0400 Subject: [PATCH 1/3] Pass contexts to GCSService. Add support for concurrent GetObjectSize requests in GetInfo --- gcsstore/gcsservice.go | 82 ++++++++++++++++++++-------------------- gcsstore/gcsstore.go | 85 ++++++++++++++++++++++++++++++++---------- 2 files changed, 105 insertions(+), 62 deletions(-) diff --git a/gcsstore/gcsservice.go b/gcsstore/gcsservice.go index a5c34e8..f1b7319 100644 --- a/gcsstore/gcsservice.go +++ b/gcsstore/gcsservice.go @@ -59,14 +59,14 @@ type GCSReader interface { // operations that are required to enable the tus protocol // to work with Google's cloud storage. type GCSAPI interface { - ReadObject(params GCSObjectParams) (GCSReader, error) - GetObjectSize(params GCSObjectParams) (int64, error) - SetObjectMetadata(params GCSObjectParams, metadata map[string]string) error - DeleteObject(params GCSObjectParams) error - DeleteObjectsWithFilter(params GCSFilterParams) error - WriteObject(params GCSObjectParams, r io.Reader) (int64, error) - ComposeObjects(params GCSComposeParams) error - FilterObjects(params GCSFilterParams) ([]string, error) + ReadObject(ctx context.Context, params GCSObjectParams) (GCSReader, error) + GetObjectSize(ctx context.Context, params GCSObjectParams) (int64, error) + SetObjectMetadata(ctx context.Context, params GCSObjectParams, metadata map[string]string) error + DeleteObject(ctx context.Context, params GCSObjectParams) error + DeleteObjectsWithFilter(ctx context.Context, params GCSFilterParams) error + WriteObject(ctx context.Context, params GCSObjectParams, r io.Reader) (int64, error) + ComposeObjects(ctx context.Context, params GCSComposeParams) error + FilterObjects(ctx context.Context, params GCSFilterParams) ([]string, error) } // GCSService holds the cloud.google.com/go/storage client @@ -75,7 +75,6 @@ type GCSAPI interface { // The usage of these closures allow them to be redefined in the testing package, allowing test to be run against this file. type GCSService struct { Client *storage.Client - Ctx context.Context } // NewGCSService returns a GCSSerivce object given a GCloud service account file path. @@ -88,15 +87,14 @@ func NewGCSService(filename string) (*GCSService, error) { service := &GCSService{ Client: client, - Ctx: ctx, } return service, nil } // GetObjectSize returns the byte length of the specified GCS object. -func (service *GCSService) GetObjectSize(params GCSObjectParams) (int64, error) { - attrs, err := service.GetObjectAttrs(params) +func (service *GCSService) GetObjectSize(ctx context.Context, params GCSObjectParams) (int64, error) { + attrs, err := service.GetObjectAttrs(ctx, params) if err != nil { return 0, err } @@ -105,8 +103,8 @@ func (service *GCSService) GetObjectSize(params GCSObjectParams) (int64, error) } // DeleteObjectWithPrefix will delete objects who match the provided filter parameters. -func (service *GCSService) DeleteObjectsWithFilter(params GCSFilterParams) error { - names, err := service.FilterObjects(params) +func (service *GCSService) DeleteObjectsWithFilter(ctx context.Context, params GCSFilterParams) error { + names, err := service.FilterObjects(ctx, params) if err != nil { return err } @@ -118,7 +116,7 @@ func (service *GCSService) DeleteObjectsWithFilter(params GCSFilterParams) error ID: name, } - err := service.DeleteObject(objectParams) + err := service.DeleteObject(ctx, objectParams) if err != nil { return err } @@ -130,7 +128,7 @@ func (service *GCSService) DeleteObjectsWithFilter(params GCSFilterParams) error const COMPOSE_RETRIES = 3 // Compose takes a bucket name, a list of initial source names, and a destination string to compose multiple GCS objects together -func (service *GCSService) compose(bucket string, srcs []string, dst string) error { +func (service *GCSService) compose(ctx context.Context, bucket string, srcs []string, dst string) error { dstParams := GCSObjectParams{ Bucket: bucket, ID: dst, @@ -139,7 +137,7 @@ func (service *GCSService) compose(bucket string, srcs []string, dst string) err var crc uint32 for i := 0; i < len(srcs); i++ { objSrcs[i] = service.Client.Bucket(bucket).Object(srcs[i]) - srcAttrs, err := service.GetObjectAttrs(GCSObjectParams{ + srcAttrs, err := service.GetObjectAttrs(ctx, GCSObjectParams{ Bucket: bucket, ID: srcs[i], }) @@ -154,7 +152,7 @@ func (service *GCSService) compose(bucket string, srcs []string, dst string) err } } - attrs, err := service.GetObjectAttrs(GCSObjectParams{ + attrs, err := service.GetObjectAttrs(ctx, GCSObjectParams{ Bucket: bucket, ID: srcs[0], }) @@ -163,7 +161,7 @@ func (service *GCSService) compose(bucket string, srcs []string, dst string) err } for i := 0; i < COMPOSE_RETRIES; i++ { - dstCRC, err := service.ComposeFrom(objSrcs, dstParams, attrs.ContentType) + dstCRC, err := service.ComposeFrom(ctx, objSrcs, dstParams, attrs.ContentType) if err != nil { return err } @@ -173,7 +171,7 @@ func (service *GCSService) compose(bucket string, srcs []string, dst string) err } } - err = service.DeleteObject(GCSObjectParams{ + err = service.DeleteObject(ctx, GCSObjectParams{ Bucket: bucket, ID: dst, }) @@ -190,9 +188,9 @@ func (service *GCSService) compose(bucket string, srcs []string, dst string) err // can combined in a compose operation. GCloud storage's limit is 32. const MAX_OBJECT_COMPOSITION = 32 -func (service *GCSService) recursiveCompose(srcs []string, params GCSComposeParams, lvl int) error { +func (service *GCSService) recursiveCompose(ctx context.Context, srcs []string, params GCSComposeParams, lvl int) error { if len(srcs) <= MAX_OBJECT_COMPOSITION { - err := service.compose(params.Bucket, srcs, params.Destination) + err := service.compose(ctx, params.Bucket, srcs, params.Destination) if err != nil { return err } @@ -204,7 +202,7 @@ func (service *GCSService) recursiveCompose(srcs []string, params GCSComposePara Prefix: prefix, } - err = service.DeleteObjectsWithFilter(filterParams) + err = service.DeleteObjectsWithFilter(ctx, filterParams) if err != nil { return err } @@ -223,7 +221,7 @@ func (service *GCSService) recursiveCompose(srcs []string, params GCSComposePara } tmpDst := fmt.Sprintf("%s_tmp_%d_%d", params.Destination, lvl, i) - err := service.compose(params.Bucket, srcs[start:end], tmpDst) + err := service.compose(ctx, params.Bucket, srcs[start:end], tmpDst) if err != nil { return err } @@ -231,15 +229,15 @@ func (service *GCSService) recursiveCompose(srcs []string, params GCSComposePara tmpSrcs[i] = tmpDst } - return service.recursiveCompose(tmpSrcs, params, lvl+1) + return service.recursiveCompose(ctx, tmpSrcs, params, lvl+1) } // ComposeObjects composes multiple GCS objects in to a single object. // Since GCS limits composition to a max of 32 objects, additional logic // has been added to chunk objects in to groups of 32 and then recursively // compose those objects together. -func (service *GCSService) ComposeObjects(params GCSComposeParams) error { - err := service.recursiveCompose(params.Sources, params, 0) +func (service *GCSService) ComposeObjects(ctx context.Context, params GCSComposeParams) error { + err := service.recursiveCompose(ctx, params.Sources, params, 0) if err != nil { return err @@ -249,10 +247,10 @@ func (service *GCSService) ComposeObjects(params GCSComposeParams) error { } // GetObjectAttrs returns the associated attributes of a GCS object. See: https://godoc.org/cloud.google.com/go/storage#ObjectAttrs -func (service *GCSService) GetObjectAttrs(params GCSObjectParams) (*storage.ObjectAttrs, error) { +func (service *GCSService) GetObjectAttrs(ctx context.Context, params GCSObjectParams) (*storage.ObjectAttrs, error) { obj := service.Client.Bucket(params.Bucket).Object(params.ID) - attrs, err := obj.Attrs(service.Ctx) + attrs, err := obj.Attrs(ctx) if err != nil { return nil, err } @@ -262,8 +260,8 @@ func (service *GCSService) GetObjectAttrs(params GCSObjectParams) (*storage.Obje } // ReadObject reads a GCSObjectParams, returning a GCSReader object if successful, and an error otherwise -func (service *GCSService) ReadObject(params GCSObjectParams) (GCSReader, error) { - r, err := service.Client.Bucket(params.Bucket).Object(params.ID).NewReader(service.Ctx) +func (service *GCSService) ReadObject(ctx context.Context, params GCSObjectParams) (GCSReader, error) { + r, err := service.Client.Bucket(params.Bucket).Object(params.ID).NewReader(ctx) if err != nil { return nil, err } @@ -272,25 +270,25 @@ func (service *GCSService) ReadObject(params GCSObjectParams) (GCSReader, error) } // SetObjectMetadata reads a GCSObjectParams and a map of metedata, returning a nil on sucess and an error otherwise -func (service *GCSService) SetObjectMetadata(params GCSObjectParams, metadata map[string]string) error { +func (service *GCSService) SetObjectMetadata(ctx context.Context, params GCSObjectParams, metadata map[string]string) error { attrs := storage.ObjectAttrsToUpdate{ Metadata: metadata, } - _, err := service.Client.Bucket(params.Bucket).Object(params.ID).Update(service.Ctx, attrs) + _, err := service.Client.Bucket(params.Bucket).Object(params.ID).Update(ctx, attrs) return err } // DeleteObject deletes the object defined by GCSObjectParams -func (service *GCSService) DeleteObject(params GCSObjectParams) error { - return service.Client.Bucket(params.Bucket).Object(params.ID).Delete(service.Ctx) +func (service *GCSService) DeleteObject(ctx context.Context, params GCSObjectParams) error { + return service.Client.Bucket(params.Bucket).Object(params.ID).Delete(ctx) } // Write object writes the file set out by the GCSObjectParams -func (service *GCSService) WriteObject(params GCSObjectParams, r io.Reader) (int64, error) { +func (service *GCSService) WriteObject(ctx context.Context, params GCSObjectParams, r io.Reader) (int64, error) { obj := service.Client.Bucket(params.Bucket).Object(params.ID) - w := obj.NewWriter(service.Ctx) + w := obj.NewWriter(ctx) defer w.Close() @@ -303,16 +301,16 @@ func (service *GCSService) WriteObject(params GCSObjectParams, r io.Reader) (int } // ComposeFrom composes multiple object types together, -func (service *GCSService) ComposeFrom(objSrcs []*storage.ObjectHandle, dstParams GCSObjectParams, contentType string) (uint32, error) { +func (service *GCSService) ComposeFrom(ctx context.Context, objSrcs []*storage.ObjectHandle, dstParams GCSObjectParams, contentType string) (uint32, error) { dstObj := service.Client.Bucket(dstParams.Bucket).Object(dstParams.ID) c := dstObj.ComposerFrom(objSrcs...) c.ContentType = contentType - _, err := c.Run(service.Ctx) + _, err := c.Run(ctx) if err != nil { return 0, err } - dstAttrs, err := dstObj.Attrs(service.Ctx) + dstAttrs, err := dstObj.Attrs(ctx) if err != nil { return 0, err } @@ -325,14 +323,14 @@ func (service *GCSService) ComposeFrom(objSrcs []*storage.ObjectHandle, dstParam // is zero based. The format [uid]_tmp_[recursion_lvl]_[chunk_idx] can also be used to // specify objects that have been composed in a recursive fashion. These different formats // are usedd to ensure that objects are composed in the correct order. -func (service *GCSService) FilterObjects(params GCSFilterParams) ([]string, error) { +func (service *GCSService) FilterObjects(ctx context.Context, params GCSFilterParams) ([]string, error) { bkt := service.Client.Bucket(params.Bucket) q := storage.Query{ Prefix: params.Prefix, Versions: false, } - it := bkt.Objects(service.Ctx, &q) + it := bkt.Objects(ctx, &q) names := make([]string, 0) loop: for { diff --git a/gcsstore/gcsstore.go b/gcsstore/gcsstore.go index a52bba6..4d72386 100644 --- a/gcsstore/gcsstore.go +++ b/gcsstore/gcsstore.go @@ -14,9 +14,12 @@ import ( "bytes" "encoding/json" "fmt" + "golang.org/x/net/context" "io" "strconv" "strings" + "sync" + "sync/atomic" "cloud.google.com/go/storage" "github.com/tus/tusd" @@ -55,7 +58,8 @@ func (store GCSStore) NewUpload(info tusd.FileInfo) (id string, err error) { info.ID = uid.Uid() } - err = store.writeInfo(info.ID, info) + ctx := context.Background() + err = store.writeInfo(ctx, info.ID, info) if err != nil { return info.ID, err } @@ -70,7 +74,8 @@ func (store GCSStore) WriteChunk(id string, offset int64, src io.Reader) (int64, Prefix: prefix, } - names, err := store.Service.FilterObjects(filterParams) + ctx := context.Background() + names, err := store.Service.FilterObjects(ctx, filterParams) if err != nil { return 0, err } @@ -95,7 +100,7 @@ func (store GCSStore) WriteChunk(id string, offset int64, src io.Reader) (int64, ID: cid, } - n, err := store.Service.WriteObject(objectParams, src) + n, err := store.Service.WriteObject(ctx, objectParams, src) if err != nil { return 0, err } @@ -103,6 +108,8 @@ func (store GCSStore) WriteChunk(id string, offset int64, src io.Reader) (int64, return n, err } +const CONCURRENT_SIZE_REQUESTS = 32 + func (store GCSStore) GetInfo(id string) (tusd.FileInfo, error) { info := tusd.FileInfo{} i := fmt.Sprintf("%s.info", id) @@ -112,7 +119,10 @@ func (store GCSStore) GetInfo(id string) (tusd.FileInfo, error) { ID: i, } - r, err := store.Service.ReadObject(params) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + r, err := store.Service.ReadObject(ctx, params) if err != nil { if err == storage.ErrObjectNotExist { return info, tusd.ErrNotFound @@ -136,37 +146,69 @@ func (store GCSStore) GetInfo(id string) (tusd.FileInfo, error) { Prefix: prefix, } - names, err := store.Service.FilterObjects(filterParams) + names, err := store.Service.FilterObjects(ctx, filterParams) if err != nil { return info, err } var offset int64 = 0 + var firstError error = nil + var wg sync.WaitGroup + + sem := make(chan struct{}, CONCURRENT_SIZE_REQUESTS) + errChan := make(chan error) + + go func() { + for err := range errChan { + if err != context.Canceled && firstError == nil { + firstError = err + cancel() + } + } + }() + for _, name := range names { + sem <- struct{}{} + wg.Add(1) params = GCSObjectParams{ Bucket: store.Bucket, ID: name, } - size, err := store.Service.GetObjectSize(params) - if err != nil { - return info, err - } + go func() { + defer func() { + <-sem + wg.Done() + }() - offset += size + size, err := store.Service.GetObjectSize(ctx, params) + + if err != nil { + errChan <- err + return + } + + atomic.AddInt64(&offset, size) + }() + } + + wg.Wait() + close(errChan) + + if firstError != nil { + return info, firstError } info.Offset = offset - err = store.writeInfo(id, info) + err = store.writeInfo(ctx, id, info) if err != nil { return info, err } return info, nil - } -func (store GCSStore) writeInfo(id string, info tusd.FileInfo) error { +func (store GCSStore) writeInfo(ctx context.Context, id string, info tusd.FileInfo) error { data, err := json.Marshal(info) if err != nil { return err @@ -180,7 +222,7 @@ func (store GCSStore) writeInfo(id string, info tusd.FileInfo) error { ID: i, } - _, err = store.Service.WriteObject(params, r) + _, err = store.Service.WriteObject(ctx, params, r) if err != nil { return err } @@ -195,7 +237,8 @@ func (store GCSStore) FinishUpload(id string) error { Prefix: prefix, } - names, err := store.Service.FilterObjects(filterParams) + ctx := context.Background() + names, err := store.Service.FilterObjects(ctx, filterParams) if err != nil { return err } @@ -206,12 +249,12 @@ func (store GCSStore) FinishUpload(id string) error { Sources: names, } - err = store.Service.ComposeObjects(composeParams) + err = store.Service.ComposeObjects(ctx, composeParams) if err != nil { return err } - err = store.Service.DeleteObjectsWithFilter(filterParams) + err = store.Service.DeleteObjectsWithFilter(ctx, filterParams) if err != nil { return err } @@ -226,7 +269,7 @@ func (store GCSStore) FinishUpload(id string) error { ID: id, } - err = store.Service.SetObjectMetadata(objectParams, info.MetaData) + err = store.Service.SetObjectMetadata(ctx, objectParams, info.MetaData) if err != nil { return err } @@ -240,7 +283,8 @@ func (store GCSStore) Terminate(id string) error { Prefix: id, } - err := store.Service.DeleteObjectsWithFilter(filterParams) + ctx := context.Background() + err := store.Service.DeleteObjectsWithFilter(ctx, filterParams) if err != nil { return err } @@ -254,7 +298,8 @@ func (store GCSStore) GetReader(id string) (io.Reader, error) { ID: id, } - r, err := store.Service.ReadObject(params) + ctx := context.Background() + r, err := store.Service.ReadObject(ctx, params) if err != nil { return nil, err } From 82e140d317743ad4b0c5a6a40429cd47d6547347 Mon Sep 17 00:00:00 2001 From: Tom Berger Date: Wed, 25 Jul 2018 12:51:07 -0400 Subject: [PATCH 2/3] Pass GCS object params directly to goroutine --- gcsstore/gcsstore.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/gcsstore/gcsstore.go b/gcsstore/gcsstore.go index 4d72386..589ef19 100644 --- a/gcsstore/gcsstore.go +++ b/gcsstore/gcsstore.go @@ -119,9 +119,7 @@ func (store GCSStore) GetInfo(id string) (tusd.FileInfo, error) { ID: i, } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - + ctx := context.Background() r, err := store.Service.ReadObject(ctx, params) if err != nil { if err == storage.ErrObjectNotExist { @@ -157,6 +155,8 @@ func (store GCSStore) GetInfo(id string) (tusd.FileInfo, error) { sem := make(chan struct{}, CONCURRENT_SIZE_REQUESTS) errChan := make(chan error) + ctxCancel, cancel := context.WithCancel(ctx) + defer cancel() go func() { for err := range errChan { @@ -175,13 +175,13 @@ func (store GCSStore) GetInfo(id string) (tusd.FileInfo, error) { ID: name, } - go func() { + go func(params GCSObjectParams) { defer func() { <-sem wg.Done() }() - size, err := store.Service.GetObjectSize(ctx, params) + size, err := store.Service.GetObjectSize(ctxCancel, params) if err != nil { errChan <- err @@ -189,7 +189,7 @@ func (store GCSStore) GetInfo(id string) (tusd.FileInfo, error) { } atomic.AddInt64(&offset, size) - }() + }(params) } wg.Wait() From 9406b5e516760973813eafd8f975a64fc4b32a7c Mon Sep 17 00:00:00 2001 From: Tom Berger Date: Wed, 25 Jul 2018 12:51:38 -0400 Subject: [PATCH 3/3] Update tests to pass context and expect GetObjectSize in any order --- gcsstore/gcsservice_test.go | 32 ++++++----------- gcsstore/gcsstore_mock_test.go | 65 +++++++++++++++++----------------- gcsstore/gcsstore_test.go | 58 ++++++++++++++++++------------ 3 files changed, 80 insertions(+), 75 deletions(-) diff --git a/gcsstore/gcsservice_test.go b/gcsstore/gcsservice_test.go index b9093ef..ad9573f 100644 --- a/gcsstore/gcsservice_test.go +++ b/gcsstore/gcsservice_test.go @@ -2,7 +2,7 @@ package gcsstore_test import ( "bytes" - "context" + "golang.org/x/net/context" "testing" "gopkg.in/h2non/gock.v1" @@ -47,10 +47,9 @@ func TestGetObjectSize(t *testing.T) { service := GCSService{ Client: client, - Ctx: ctx, } - size, err := service.GetObjectSize(GCSObjectParams{ + size, err := service.GetObjectSize(ctx, GCSObjectParams{ Bucket: "test-bucket", ID: "test-name", }) @@ -94,10 +93,9 @@ func TestDeleteObjectWithFilter(t *testing.T) { service := GCSService{ Client: client, - Ctx: ctx, } - err = service.DeleteObjectsWithFilter(GCSFilterParams{ + err = service.DeleteObjectsWithFilter(ctx, GCSFilterParams{ Bucket: "test-bucket", Prefix: "test-prefix", }) @@ -180,10 +178,9 @@ func TestComposeObjects(t *testing.T) { service := GCSService{ Client: client, - Ctx: ctx, } - err = service.ComposeObjects(GCSComposeParams{ + err = service.ComposeObjects(ctx, GCSComposeParams{ Bucket: "test-bucket", Sources: []string{"test1", "test2", "test3"}, Destination: "test_all", @@ -222,10 +219,9 @@ func TestGetObjectAttrs(t *testing.T) { service := GCSService{ Client: client, - Ctx: ctx, } - attrs, err := service.GetObjectAttrs(GCSObjectParams{ + attrs, err := service.GetObjectAttrs(ctx, GCSObjectParams{ Bucket: "test-bucket", ID: "test-name", }) @@ -266,10 +262,9 @@ func TestReadObject(t *testing.T) { service := GCSService{ Client: client, - Ctx: ctx, } - reader, err := service.ReadObject(GCSObjectParams{ + reader, err := service.ReadObject(ctx, GCSObjectParams{ Bucket: "test-bucket", ID: "test-name", }) @@ -304,10 +299,9 @@ func TestSetObjectMetadata(t *testing.T) { service := GCSService{ Client: client, - Ctx: ctx, } - err = service.SetObjectMetadata(GCSObjectParams{ + err = service.SetObjectMetadata(ctx, GCSObjectParams{ Bucket: "test-bucket", ID: "test-name", }, map[string]string{"test": "metadata", "fake": "test"}) @@ -343,10 +337,9 @@ func TestDeleteObject(t *testing.T) { service := GCSService{ Client: client, - Ctx: ctx, } - err = service.DeleteObject(GCSObjectParams{ + err = service.DeleteObject(ctx, GCSObjectParams{ Bucket: "test-bucket", ID: "test-name", }) @@ -376,12 +369,11 @@ func TestWriteObject(t *testing.T) { service := GCSService{ Client: client, - Ctx: ctx, } reader := bytes.NewReader([]byte{1}) - size, err := service.WriteObject(GCSObjectParams{ + size, err := service.WriteObject(ctx, GCSObjectParams{ Bucket: "test-bucket", ID: "test-name", }, reader) @@ -428,10 +420,9 @@ func TestComposeFrom(t *testing.T) { service := GCSService{ Client: client, - Ctx: ctx, } - crc, err := service.ComposeFrom([]*storage.ObjectHandle{client.Bucket("test-bucket").Object("my-object")}, GCSObjectParams{ + crc, err := service.ComposeFrom(ctx, []*storage.ObjectHandle{client.Bucket("test-bucket").Object("my-object")}, GCSObjectParams{ Bucket: "test-bucket", ID: "my-object", }, "text") @@ -478,10 +469,9 @@ func TestFilterObject(t *testing.T) { service := GCSService{ Client: client, - Ctx: ctx, } - objects, err := service.FilterObjects(GCSFilterParams{ + objects, err := service.FilterObjects(ctx, GCSFilterParams{ Bucket: "test-bucket", Prefix: "test-prefix", }) diff --git a/gcsstore/gcsstore_mock_test.go b/gcsstore/gcsstore_mock_test.go index 74549ff..10da5b5 100644 --- a/gcsstore/gcsstore_mock_test.go +++ b/gcsstore/gcsstore_mock_test.go @@ -4,6 +4,7 @@ package gcsstore_test import ( + context "context" gomock "github.com/golang/mock/gomock" gcsstore "github.com/tus/tusd/gcsstore" io "io" @@ -102,86 +103,86 @@ func (_m *MockGCSAPI) EXPECT() *_MockGCSAPIRecorder { return _m.recorder } -func (_m *MockGCSAPI) ComposeObjects(_param0 gcsstore.GCSComposeParams) error { - ret := _m.ctrl.Call(_m, "ComposeObjects", _param0) +func (_m *MockGCSAPI) ComposeObjects(_param0 context.Context, _param1 gcsstore.GCSComposeParams) error { + ret := _m.ctrl.Call(_m, "ComposeObjects", _param0, _param1) ret0, _ := ret[0].(error) return ret0 } -func (_mr *_MockGCSAPIRecorder) ComposeObjects(arg0 interface{}) *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "ComposeObjects", arg0) +func (_mr *_MockGCSAPIRecorder) ComposeObjects(arg0, arg1 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "ComposeObjects", arg0, arg1) } -func (_m *MockGCSAPI) DeleteObject(_param0 gcsstore.GCSObjectParams) error { - ret := _m.ctrl.Call(_m, "DeleteObject", _param0) +func (_m *MockGCSAPI) DeleteObject(_param0 context.Context, _param1 gcsstore.GCSObjectParams) error { + ret := _m.ctrl.Call(_m, "DeleteObject", _param0, _param1) ret0, _ := ret[0].(error) return ret0 } -func (_mr *_MockGCSAPIRecorder) DeleteObject(arg0 interface{}) *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "DeleteObject", arg0) +func (_mr *_MockGCSAPIRecorder) DeleteObject(arg0, arg1 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "DeleteObject", arg0, arg1) } -func (_m *MockGCSAPI) DeleteObjectsWithFilter(_param0 gcsstore.GCSFilterParams) error { - ret := _m.ctrl.Call(_m, "DeleteObjectsWithFilter", _param0) +func (_m *MockGCSAPI) DeleteObjectsWithFilter(_param0 context.Context, _param1 gcsstore.GCSFilterParams) error { + ret := _m.ctrl.Call(_m, "DeleteObjectsWithFilter", _param0, _param1) ret0, _ := ret[0].(error) return ret0 } -func (_mr *_MockGCSAPIRecorder) DeleteObjectsWithFilter(arg0 interface{}) *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "DeleteObjectsWithFilter", arg0) +func (_mr *_MockGCSAPIRecorder) DeleteObjectsWithFilter(arg0, arg1 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "DeleteObjectsWithFilter", arg0, arg1) } -func (_m *MockGCSAPI) FilterObjects(_param0 gcsstore.GCSFilterParams) ([]string, error) { - ret := _m.ctrl.Call(_m, "FilterObjects", _param0) +func (_m *MockGCSAPI) FilterObjects(_param0 context.Context, _param1 gcsstore.GCSFilterParams) ([]string, error) { + ret := _m.ctrl.Call(_m, "FilterObjects", _param0, _param1) ret0, _ := ret[0].([]string) ret1, _ := ret[1].(error) return ret0, ret1 } -func (_mr *_MockGCSAPIRecorder) FilterObjects(arg0 interface{}) *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "FilterObjects", arg0) +func (_mr *_MockGCSAPIRecorder) FilterObjects(arg0, arg1 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "FilterObjects", arg0, arg1) } -func (_m *MockGCSAPI) GetObjectSize(_param0 gcsstore.GCSObjectParams) (int64, error) { - ret := _m.ctrl.Call(_m, "GetObjectSize", _param0) +func (_m *MockGCSAPI) GetObjectSize(_param0 context.Context, _param1 gcsstore.GCSObjectParams) (int64, error) { + ret := _m.ctrl.Call(_m, "GetObjectSize", _param0, _param1) ret0, _ := ret[0].(int64) ret1, _ := ret[1].(error) return ret0, ret1 } -func (_mr *_MockGCSAPIRecorder) GetObjectSize(arg0 interface{}) *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "GetObjectSize", arg0) +func (_mr *_MockGCSAPIRecorder) GetObjectSize(arg0, arg1 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "GetObjectSize", arg0, arg1) } -func (_m *MockGCSAPI) ReadObject(_param0 gcsstore.GCSObjectParams) (gcsstore.GCSReader, error) { - ret := _m.ctrl.Call(_m, "ReadObject", _param0) +func (_m *MockGCSAPI) ReadObject(_param0 context.Context, _param1 gcsstore.GCSObjectParams) (gcsstore.GCSReader, error) { + ret := _m.ctrl.Call(_m, "ReadObject", _param0, _param1) ret0, _ := ret[0].(gcsstore.GCSReader) ret1, _ := ret[1].(error) return ret0, ret1 } -func (_mr *_MockGCSAPIRecorder) ReadObject(arg0 interface{}) *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "ReadObject", arg0) +func (_mr *_MockGCSAPIRecorder) ReadObject(arg0, arg1 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "ReadObject", arg0, arg1) } -func (_m *MockGCSAPI) SetObjectMetadata(_param0 gcsstore.GCSObjectParams, _param1 map[string]string) error { - ret := _m.ctrl.Call(_m, "SetObjectMetadata", _param0, _param1) +func (_m *MockGCSAPI) SetObjectMetadata(_param0 context.Context, _param1 gcsstore.GCSObjectParams, _param2 map[string]string) error { + ret := _m.ctrl.Call(_m, "SetObjectMetadata", _param0, _param1, _param2) ret0, _ := ret[0].(error) return ret0 } -func (_mr *_MockGCSAPIRecorder) SetObjectMetadata(arg0, arg1 interface{}) *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "SetObjectMetadata", arg0, arg1) +func (_mr *_MockGCSAPIRecorder) SetObjectMetadata(arg0, arg1, arg2 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "SetObjectMetadata", arg0, arg1, arg2) } -func (_m *MockGCSAPI) WriteObject(_param0 gcsstore.GCSObjectParams, _param1 io.Reader) (int64, error) { - ret := _m.ctrl.Call(_m, "WriteObject", _param0, _param1) +func (_m *MockGCSAPI) WriteObject(_param0 context.Context, _param1 gcsstore.GCSObjectParams, _param2 io.Reader) (int64, error) { + ret := _m.ctrl.Call(_m, "WriteObject", _param0, _param1, _param2) ret0, _ := ret[0].(int64) ret1, _ := ret[1].(error) return ret0, ret1 } -func (_mr *_MockGCSAPIRecorder) WriteObject(arg0, arg1 interface{}) *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "WriteObject", arg0, arg1) +func (_mr *_MockGCSAPIRecorder) WriteObject(arg0, arg1, arg2 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "WriteObject", arg0, arg1, arg2) } diff --git a/gcsstore/gcsstore_test.go b/gcsstore/gcsstore_test.go index 45c0545..fbefcc2 100644 --- a/gcsstore/gcsstore_test.go +++ b/gcsstore/gcsstore_test.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "fmt" + "golang.org/x/net/context" "testing" "cloud.google.com/go/storage" @@ -55,7 +56,8 @@ func TestNewUpload(t *testing.T) { ID: fmt.Sprintf("%s.info", mockID), } - service.EXPECT().WriteObject(params, r).Return(int64(r.Len()), nil) + ctx := context.Background() + service.EXPECT().WriteObject(ctx, params, r).Return(int64(r.Len()), nil) id, err := store.NewUpload(mockTusdInfo) assert.Nil(err) @@ -130,15 +132,19 @@ func TestGetInfo(t *testing.T) { infoR := bytes.NewReader(offsetInfoData) + ctx := context.Background() gomock.InOrder( - service.EXPECT().ReadObject(params).Return(r, nil), - service.EXPECT().FilterObjects(filterParams).Return(mockPartials, nil), - service.EXPECT().GetObjectSize(mockObjectParams0).Return(size, nil), - service.EXPECT().GetObjectSize(mockObjectParams1).Return(size, nil), - service.EXPECT().GetObjectSize(mockObjectParams2).Return(size, nil), - service.EXPECT().WriteObject(params, infoR).Return(int64(len(offsetInfoData)), nil), + service.EXPECT().ReadObject(ctx, params).Return(r, nil), + service.EXPECT().FilterObjects(ctx, filterParams).Return(mockPartials, nil), ) + ctxCancel, _ := context.WithCancel(ctx) + service.EXPECT().GetObjectSize(ctxCancel, mockObjectParams0).Return(size, nil) + service.EXPECT().GetObjectSize(ctxCancel, mockObjectParams1).Return(size, nil) + lastGetObjectSize := service.EXPECT().GetObjectSize(ctxCancel, mockObjectParams2).Return(size, nil) + + service.EXPECT().WriteObject(ctx, params, infoR).Return(int64(len(offsetInfoData)), nil).After(lastGetObjectSize) + info, err := store.GetInfo(mockID) assert.Nil(err) assert.Equal(mockTusdInfo, info) @@ -157,8 +163,9 @@ func TestGetInfoNotFound(t *testing.T) { ID: fmt.Sprintf("%s.info", mockID), } + ctx := context.Background() gomock.InOrder( - service.EXPECT().ReadObject(params).Return(nil, storage.ErrObjectNotExist), + service.EXPECT().ReadObject(ctx, params).Return(nil, storage.ErrObjectNotExist), ) _, err := store.GetInfo(mockID) @@ -205,7 +212,8 @@ func TestGetReader(t *testing.T) { r := MockGetReader{} - service.EXPECT().ReadObject(params).Return(r, nil) + ctx := context.Background() + service.EXPECT().ReadObject(ctx, params).Return(r, nil) reader, err := store.GetReader(mockID) assert.Nil(err) @@ -231,7 +239,8 @@ func TestTerminate(t *testing.T) { Prefix: mockID, } - service.EXPECT().DeleteObjectsWithFilter(filterParams).Return(nil) + ctx := context.Background() + service.EXPECT().DeleteObjectsWithFilter(ctx, filterParams).Return(nil) err := store.Terminate(mockID) assert.Nil(err) @@ -302,19 +311,23 @@ func TestFinishUpload(t *testing.T) { "foo": "bar", } + ctx := context.Background() gomock.InOrder( - service.EXPECT().FilterObjects(filterParams).Return(mockPartials, nil), - service.EXPECT().ComposeObjects(composeParams).Return(nil), - service.EXPECT().DeleteObjectsWithFilter(filterParams).Return(nil), - service.EXPECT().ReadObject(infoParams).Return(r, nil), - service.EXPECT().FilterObjects(filterParams2).Return(mockPartials, nil), - service.EXPECT().GetObjectSize(mockObjectParams0).Return(size, nil), - service.EXPECT().GetObjectSize(mockObjectParams1).Return(size, nil), - service.EXPECT().GetObjectSize(mockObjectParams2).Return(size, nil), - service.EXPECT().WriteObject(infoParams, infoR).Return(int64(len(offsetInfoData)), nil), - service.EXPECT().SetObjectMetadata(objectParams, metadata).Return(nil), + service.EXPECT().FilterObjects(ctx, filterParams).Return(mockPartials, nil), + service.EXPECT().ComposeObjects(ctx, composeParams).Return(nil), + service.EXPECT().DeleteObjectsWithFilter(ctx, filterParams).Return(nil), + service.EXPECT().ReadObject(ctx, infoParams).Return(r, nil), + service.EXPECT().FilterObjects(ctx, filterParams2).Return(mockPartials, nil), ) + ctxCancel, _ := context.WithCancel(ctx) + service.EXPECT().GetObjectSize(ctxCancel, mockObjectParams0).Return(size, nil) + service.EXPECT().GetObjectSize(ctxCancel, mockObjectParams1).Return(size, nil) + lastGetObjectSize := service.EXPECT().GetObjectSize(ctxCancel, mockObjectParams2).Return(size, nil) + + writeObject := service.EXPECT().WriteObject(ctx, infoParams, infoR).Return(int64(len(offsetInfoData)), nil).After(lastGetObjectSize) + service.EXPECT().SetObjectMetadata(ctx, objectParams, metadata).Return(nil).After(writeObject) + err = store.FinishUpload(mockID) assert.Nil(err) } @@ -378,9 +391,10 @@ func TestWriteChunk(t *testing.T) { rGet := bytes.NewReader([]byte(mockReaderData)) + ctx := context.Background() gomock.InOrder( - service.EXPECT().FilterObjects(filterParams).Return(partials, nil), - service.EXPECT().WriteObject(writeObjectParams, rGet).Return(int64(len(mockReaderData)), nil), + service.EXPECT().FilterObjects(ctx, filterParams).Return(partials, nil), + service.EXPECT().WriteObject(ctx, writeObjectParams, rGet).Return(int64(len(mockReaderData)), nil), ) reader := bytes.NewReader([]byte(mockReaderData))