Merge pull request #199 from vimeo/gcs-getinfo-parallel
Add support for concurrent GetObjectSize requests
This commit is contained in:
commit
cbaba850a7
|
@ -59,14 +59,14 @@ type GCSReader interface {
|
||||||
// operations that are required to enable the tus protocol
|
// operations that are required to enable the tus protocol
|
||||||
// to work with Google's cloud storage.
|
// to work with Google's cloud storage.
|
||||||
type GCSAPI interface {
|
type GCSAPI interface {
|
||||||
ReadObject(params GCSObjectParams) (GCSReader, error)
|
ReadObject(ctx context.Context, params GCSObjectParams) (GCSReader, error)
|
||||||
GetObjectSize(params GCSObjectParams) (int64, error)
|
GetObjectSize(ctx context.Context, params GCSObjectParams) (int64, error)
|
||||||
SetObjectMetadata(params GCSObjectParams, metadata map[string]string) error
|
SetObjectMetadata(ctx context.Context, params GCSObjectParams, metadata map[string]string) error
|
||||||
DeleteObject(params GCSObjectParams) error
|
DeleteObject(ctx context.Context, params GCSObjectParams) error
|
||||||
DeleteObjectsWithFilter(params GCSFilterParams) error
|
DeleteObjectsWithFilter(ctx context.Context, params GCSFilterParams) error
|
||||||
WriteObject(params GCSObjectParams, r io.Reader) (int64, error)
|
WriteObject(ctx context.Context, params GCSObjectParams, r io.Reader) (int64, error)
|
||||||
ComposeObjects(params GCSComposeParams) error
|
ComposeObjects(ctx context.Context, params GCSComposeParams) error
|
||||||
FilterObjects(params GCSFilterParams) ([]string, error)
|
FilterObjects(ctx context.Context, params GCSFilterParams) ([]string, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GCSService holds the cloud.google.com/go/storage client
|
// GCSService holds the cloud.google.com/go/storage client
|
||||||
|
@ -75,7 +75,6 @@ type GCSAPI interface {
|
||||||
// The usage of these closures allow them to be redefined in the testing package, allowing test to be run against this file.
|
// The usage of these closures allow them to be redefined in the testing package, allowing test to be run against this file.
|
||||||
type GCSService struct {
|
type GCSService struct {
|
||||||
Client *storage.Client
|
Client *storage.Client
|
||||||
Ctx context.Context
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewGCSService returns a GCSSerivce object given a GCloud service account file path.
|
// NewGCSService returns a GCSSerivce object given a GCloud service account file path.
|
||||||
|
@ -88,15 +87,14 @@ func NewGCSService(filename string) (*GCSService, error) {
|
||||||
|
|
||||||
service := &GCSService{
|
service := &GCSService{
|
||||||
Client: client,
|
Client: client,
|
||||||
Ctx: ctx,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return service, nil
|
return service, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetObjectSize returns the byte length of the specified GCS object.
|
// GetObjectSize returns the byte length of the specified GCS object.
|
||||||
func (service *GCSService) GetObjectSize(params GCSObjectParams) (int64, error) {
|
func (service *GCSService) GetObjectSize(ctx context.Context, params GCSObjectParams) (int64, error) {
|
||||||
attrs, err := service.GetObjectAttrs(params)
|
attrs, err := service.GetObjectAttrs(ctx, params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
@ -105,8 +103,8 @@ func (service *GCSService) GetObjectSize(params GCSObjectParams) (int64, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteObjectWithPrefix will delete objects who match the provided filter parameters.
|
// DeleteObjectWithPrefix will delete objects who match the provided filter parameters.
|
||||||
func (service *GCSService) DeleteObjectsWithFilter(params GCSFilterParams) error {
|
func (service *GCSService) DeleteObjectsWithFilter(ctx context.Context, params GCSFilterParams) error {
|
||||||
names, err := service.FilterObjects(params)
|
names, err := service.FilterObjects(ctx, params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -118,7 +116,7 @@ func (service *GCSService) DeleteObjectsWithFilter(params GCSFilterParams) error
|
||||||
ID: name,
|
ID: name,
|
||||||
}
|
}
|
||||||
|
|
||||||
err := service.DeleteObject(objectParams)
|
err := service.DeleteObject(ctx, objectParams)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -130,7 +128,7 @@ func (service *GCSService) DeleteObjectsWithFilter(params GCSFilterParams) error
|
||||||
const COMPOSE_RETRIES = 3
|
const COMPOSE_RETRIES = 3
|
||||||
|
|
||||||
// Compose takes a bucket name, a list of initial source names, and a destination string to compose multiple GCS objects together
|
// Compose takes a bucket name, a list of initial source names, and a destination string to compose multiple GCS objects together
|
||||||
func (service *GCSService) compose(bucket string, srcs []string, dst string) error {
|
func (service *GCSService) compose(ctx context.Context, bucket string, srcs []string, dst string) error {
|
||||||
dstParams := GCSObjectParams{
|
dstParams := GCSObjectParams{
|
||||||
Bucket: bucket,
|
Bucket: bucket,
|
||||||
ID: dst,
|
ID: dst,
|
||||||
|
@ -139,7 +137,7 @@ func (service *GCSService) compose(bucket string, srcs []string, dst string) err
|
||||||
var crc uint32
|
var crc uint32
|
||||||
for i := 0; i < len(srcs); i++ {
|
for i := 0; i < len(srcs); i++ {
|
||||||
objSrcs[i] = service.Client.Bucket(bucket).Object(srcs[i])
|
objSrcs[i] = service.Client.Bucket(bucket).Object(srcs[i])
|
||||||
srcAttrs, err := service.GetObjectAttrs(GCSObjectParams{
|
srcAttrs, err := service.GetObjectAttrs(ctx, GCSObjectParams{
|
||||||
Bucket: bucket,
|
Bucket: bucket,
|
||||||
ID: srcs[i],
|
ID: srcs[i],
|
||||||
})
|
})
|
||||||
|
@ -154,7 +152,7 @@ func (service *GCSService) compose(bucket string, srcs []string, dst string) err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
attrs, err := service.GetObjectAttrs(GCSObjectParams{
|
attrs, err := service.GetObjectAttrs(ctx, GCSObjectParams{
|
||||||
Bucket: bucket,
|
Bucket: bucket,
|
||||||
ID: srcs[0],
|
ID: srcs[0],
|
||||||
})
|
})
|
||||||
|
@ -163,7 +161,7 @@ func (service *GCSService) compose(bucket string, srcs []string, dst string) err
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < COMPOSE_RETRIES; i++ {
|
for i := 0; i < COMPOSE_RETRIES; i++ {
|
||||||
dstCRC, err := service.ComposeFrom(objSrcs, dstParams, attrs.ContentType)
|
dstCRC, err := service.ComposeFrom(ctx, objSrcs, dstParams, attrs.ContentType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -173,7 +171,7 @@ func (service *GCSService) compose(bucket string, srcs []string, dst string) err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = service.DeleteObject(GCSObjectParams{
|
err = service.DeleteObject(ctx, GCSObjectParams{
|
||||||
Bucket: bucket,
|
Bucket: bucket,
|
||||||
ID: dst,
|
ID: dst,
|
||||||
})
|
})
|
||||||
|
@ -190,9 +188,9 @@ func (service *GCSService) compose(bucket string, srcs []string, dst string) err
|
||||||
// can combined in a compose operation. GCloud storage's limit is 32.
|
// can combined in a compose operation. GCloud storage's limit is 32.
|
||||||
const MAX_OBJECT_COMPOSITION = 32
|
const MAX_OBJECT_COMPOSITION = 32
|
||||||
|
|
||||||
func (service *GCSService) recursiveCompose(srcs []string, params GCSComposeParams, lvl int) error {
|
func (service *GCSService) recursiveCompose(ctx context.Context, srcs []string, params GCSComposeParams, lvl int) error {
|
||||||
if len(srcs) <= MAX_OBJECT_COMPOSITION {
|
if len(srcs) <= MAX_OBJECT_COMPOSITION {
|
||||||
err := service.compose(params.Bucket, srcs, params.Destination)
|
err := service.compose(ctx, params.Bucket, srcs, params.Destination)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -204,7 +202,7 @@ func (service *GCSService) recursiveCompose(srcs []string, params GCSComposePara
|
||||||
Prefix: prefix,
|
Prefix: prefix,
|
||||||
}
|
}
|
||||||
|
|
||||||
err = service.DeleteObjectsWithFilter(filterParams)
|
err = service.DeleteObjectsWithFilter(ctx, filterParams)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -223,7 +221,7 @@ func (service *GCSService) recursiveCompose(srcs []string, params GCSComposePara
|
||||||
}
|
}
|
||||||
|
|
||||||
tmpDst := fmt.Sprintf("%s_tmp_%d_%d", params.Destination, lvl, i)
|
tmpDst := fmt.Sprintf("%s_tmp_%d_%d", params.Destination, lvl, i)
|
||||||
err := service.compose(params.Bucket, srcs[start:end], tmpDst)
|
err := service.compose(ctx, params.Bucket, srcs[start:end], tmpDst)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -231,15 +229,15 @@ func (service *GCSService) recursiveCompose(srcs []string, params GCSComposePara
|
||||||
tmpSrcs[i] = tmpDst
|
tmpSrcs[i] = tmpDst
|
||||||
}
|
}
|
||||||
|
|
||||||
return service.recursiveCompose(tmpSrcs, params, lvl+1)
|
return service.recursiveCompose(ctx, tmpSrcs, params, lvl+1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ComposeObjects composes multiple GCS objects in to a single object.
|
// ComposeObjects composes multiple GCS objects in to a single object.
|
||||||
// Since GCS limits composition to a max of 32 objects, additional logic
|
// Since GCS limits composition to a max of 32 objects, additional logic
|
||||||
// has been added to chunk objects in to groups of 32 and then recursively
|
// has been added to chunk objects in to groups of 32 and then recursively
|
||||||
// compose those objects together.
|
// compose those objects together.
|
||||||
func (service *GCSService) ComposeObjects(params GCSComposeParams) error {
|
func (service *GCSService) ComposeObjects(ctx context.Context, params GCSComposeParams) error {
|
||||||
err := service.recursiveCompose(params.Sources, params, 0)
|
err := service.recursiveCompose(ctx, params.Sources, params, 0)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -249,10 +247,10 @@ func (service *GCSService) ComposeObjects(params GCSComposeParams) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetObjectAttrs returns the associated attributes of a GCS object. See: https://godoc.org/cloud.google.com/go/storage#ObjectAttrs
|
// GetObjectAttrs returns the associated attributes of a GCS object. See: https://godoc.org/cloud.google.com/go/storage#ObjectAttrs
|
||||||
func (service *GCSService) GetObjectAttrs(params GCSObjectParams) (*storage.ObjectAttrs, error) {
|
func (service *GCSService) GetObjectAttrs(ctx context.Context, params GCSObjectParams) (*storage.ObjectAttrs, error) {
|
||||||
obj := service.Client.Bucket(params.Bucket).Object(params.ID)
|
obj := service.Client.Bucket(params.Bucket).Object(params.ID)
|
||||||
|
|
||||||
attrs, err := obj.Attrs(service.Ctx)
|
attrs, err := obj.Attrs(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -262,8 +260,8 @@ func (service *GCSService) GetObjectAttrs(params GCSObjectParams) (*storage.Obje
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadObject reads a GCSObjectParams, returning a GCSReader object if successful, and an error otherwise
|
// ReadObject reads a GCSObjectParams, returning a GCSReader object if successful, and an error otherwise
|
||||||
func (service *GCSService) ReadObject(params GCSObjectParams) (GCSReader, error) {
|
func (service *GCSService) ReadObject(ctx context.Context, params GCSObjectParams) (GCSReader, error) {
|
||||||
r, err := service.Client.Bucket(params.Bucket).Object(params.ID).NewReader(service.Ctx)
|
r, err := service.Client.Bucket(params.Bucket).Object(params.ID).NewReader(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -272,25 +270,25 @@ func (service *GCSService) ReadObject(params GCSObjectParams) (GCSReader, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetObjectMetadata reads a GCSObjectParams and a map of metedata, returning a nil on sucess and an error otherwise
|
// SetObjectMetadata reads a GCSObjectParams and a map of metedata, returning a nil on sucess and an error otherwise
|
||||||
func (service *GCSService) SetObjectMetadata(params GCSObjectParams, metadata map[string]string) error {
|
func (service *GCSService) SetObjectMetadata(ctx context.Context, params GCSObjectParams, metadata map[string]string) error {
|
||||||
attrs := storage.ObjectAttrsToUpdate{
|
attrs := storage.ObjectAttrsToUpdate{
|
||||||
Metadata: metadata,
|
Metadata: metadata,
|
||||||
}
|
}
|
||||||
_, err := service.Client.Bucket(params.Bucket).Object(params.ID).Update(service.Ctx, attrs)
|
_, err := service.Client.Bucket(params.Bucket).Object(params.ID).Update(ctx, attrs)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteObject deletes the object defined by GCSObjectParams
|
// DeleteObject deletes the object defined by GCSObjectParams
|
||||||
func (service *GCSService) DeleteObject(params GCSObjectParams) error {
|
func (service *GCSService) DeleteObject(ctx context.Context, params GCSObjectParams) error {
|
||||||
return service.Client.Bucket(params.Bucket).Object(params.ID).Delete(service.Ctx)
|
return service.Client.Bucket(params.Bucket).Object(params.ID).Delete(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write object writes the file set out by the GCSObjectParams
|
// Write object writes the file set out by the GCSObjectParams
|
||||||
func (service *GCSService) WriteObject(params GCSObjectParams, r io.Reader) (int64, error) {
|
func (service *GCSService) WriteObject(ctx context.Context, params GCSObjectParams, r io.Reader) (int64, error) {
|
||||||
obj := service.Client.Bucket(params.Bucket).Object(params.ID)
|
obj := service.Client.Bucket(params.Bucket).Object(params.ID)
|
||||||
|
|
||||||
w := obj.NewWriter(service.Ctx)
|
w := obj.NewWriter(ctx)
|
||||||
|
|
||||||
defer w.Close()
|
defer w.Close()
|
||||||
|
|
||||||
|
@ -303,16 +301,16 @@ func (service *GCSService) WriteObject(params GCSObjectParams, r io.Reader) (int
|
||||||
}
|
}
|
||||||
|
|
||||||
// ComposeFrom composes multiple object types together,
|
// ComposeFrom composes multiple object types together,
|
||||||
func (service *GCSService) ComposeFrom(objSrcs []*storage.ObjectHandle, dstParams GCSObjectParams, contentType string) (uint32, error) {
|
func (service *GCSService) ComposeFrom(ctx context.Context, objSrcs []*storage.ObjectHandle, dstParams GCSObjectParams, contentType string) (uint32, error) {
|
||||||
dstObj := service.Client.Bucket(dstParams.Bucket).Object(dstParams.ID)
|
dstObj := service.Client.Bucket(dstParams.Bucket).Object(dstParams.ID)
|
||||||
c := dstObj.ComposerFrom(objSrcs...)
|
c := dstObj.ComposerFrom(objSrcs...)
|
||||||
c.ContentType = contentType
|
c.ContentType = contentType
|
||||||
_, err := c.Run(service.Ctx)
|
_, err := c.Run(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
dstAttrs, err := dstObj.Attrs(service.Ctx)
|
dstAttrs, err := dstObj.Attrs(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
@ -325,14 +323,14 @@ func (service *GCSService) ComposeFrom(objSrcs []*storage.ObjectHandle, dstParam
|
||||||
// is zero based. The format [uid]_tmp_[recursion_lvl]_[chunk_idx] can also be used to
|
// is zero based. The format [uid]_tmp_[recursion_lvl]_[chunk_idx] can also be used to
|
||||||
// specify objects that have been composed in a recursive fashion. These different formats
|
// specify objects that have been composed in a recursive fashion. These different formats
|
||||||
// are usedd to ensure that objects are composed in the correct order.
|
// are usedd to ensure that objects are composed in the correct order.
|
||||||
func (service *GCSService) FilterObjects(params GCSFilterParams) ([]string, error) {
|
func (service *GCSService) FilterObjects(ctx context.Context, params GCSFilterParams) ([]string, error) {
|
||||||
bkt := service.Client.Bucket(params.Bucket)
|
bkt := service.Client.Bucket(params.Bucket)
|
||||||
q := storage.Query{
|
q := storage.Query{
|
||||||
Prefix: params.Prefix,
|
Prefix: params.Prefix,
|
||||||
Versions: false,
|
Versions: false,
|
||||||
}
|
}
|
||||||
|
|
||||||
it := bkt.Objects(service.Ctx, &q)
|
it := bkt.Objects(ctx, &q)
|
||||||
names := make([]string, 0)
|
names := make([]string, 0)
|
||||||
loop:
|
loop:
|
||||||
for {
|
for {
|
||||||
|
|
|
@ -2,7 +2,7 @@ package gcsstore_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"golang.org/x/net/context"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"gopkg.in/h2non/gock.v1"
|
"gopkg.in/h2non/gock.v1"
|
||||||
|
@ -47,10 +47,9 @@ func TestGetObjectSize(t *testing.T) {
|
||||||
|
|
||||||
service := GCSService{
|
service := GCSService{
|
||||||
Client: client,
|
Client: client,
|
||||||
Ctx: ctx,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
size, err := service.GetObjectSize(GCSObjectParams{
|
size, err := service.GetObjectSize(ctx, GCSObjectParams{
|
||||||
Bucket: "test-bucket",
|
Bucket: "test-bucket",
|
||||||
ID: "test-name",
|
ID: "test-name",
|
||||||
})
|
})
|
||||||
|
@ -94,10 +93,9 @@ func TestDeleteObjectWithFilter(t *testing.T) {
|
||||||
|
|
||||||
service := GCSService{
|
service := GCSService{
|
||||||
Client: client,
|
Client: client,
|
||||||
Ctx: ctx,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
err = service.DeleteObjectsWithFilter(GCSFilterParams{
|
err = service.DeleteObjectsWithFilter(ctx, GCSFilterParams{
|
||||||
Bucket: "test-bucket",
|
Bucket: "test-bucket",
|
||||||
Prefix: "test-prefix",
|
Prefix: "test-prefix",
|
||||||
})
|
})
|
||||||
|
@ -180,10 +178,9 @@ func TestComposeObjects(t *testing.T) {
|
||||||
|
|
||||||
service := GCSService{
|
service := GCSService{
|
||||||
Client: client,
|
Client: client,
|
||||||
Ctx: ctx,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
err = service.ComposeObjects(GCSComposeParams{
|
err = service.ComposeObjects(ctx, GCSComposeParams{
|
||||||
Bucket: "test-bucket",
|
Bucket: "test-bucket",
|
||||||
Sources: []string{"test1", "test2", "test3"},
|
Sources: []string{"test1", "test2", "test3"},
|
||||||
Destination: "test_all",
|
Destination: "test_all",
|
||||||
|
@ -222,10 +219,9 @@ func TestGetObjectAttrs(t *testing.T) {
|
||||||
|
|
||||||
service := GCSService{
|
service := GCSService{
|
||||||
Client: client,
|
Client: client,
|
||||||
Ctx: ctx,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
attrs, err := service.GetObjectAttrs(GCSObjectParams{
|
attrs, err := service.GetObjectAttrs(ctx, GCSObjectParams{
|
||||||
Bucket: "test-bucket",
|
Bucket: "test-bucket",
|
||||||
ID: "test-name",
|
ID: "test-name",
|
||||||
})
|
})
|
||||||
|
@ -266,10 +262,9 @@ func TestReadObject(t *testing.T) {
|
||||||
|
|
||||||
service := GCSService{
|
service := GCSService{
|
||||||
Client: client,
|
Client: client,
|
||||||
Ctx: ctx,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
reader, err := service.ReadObject(GCSObjectParams{
|
reader, err := service.ReadObject(ctx, GCSObjectParams{
|
||||||
Bucket: "test-bucket",
|
Bucket: "test-bucket",
|
||||||
ID: "test-name",
|
ID: "test-name",
|
||||||
})
|
})
|
||||||
|
@ -304,10 +299,9 @@ func TestSetObjectMetadata(t *testing.T) {
|
||||||
|
|
||||||
service := GCSService{
|
service := GCSService{
|
||||||
Client: client,
|
Client: client,
|
||||||
Ctx: ctx,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
err = service.SetObjectMetadata(GCSObjectParams{
|
err = service.SetObjectMetadata(ctx, GCSObjectParams{
|
||||||
Bucket: "test-bucket",
|
Bucket: "test-bucket",
|
||||||
ID: "test-name",
|
ID: "test-name",
|
||||||
}, map[string]string{"test": "metadata", "fake": "test"})
|
}, map[string]string{"test": "metadata", "fake": "test"})
|
||||||
|
@ -343,10 +337,9 @@ func TestDeleteObject(t *testing.T) {
|
||||||
|
|
||||||
service := GCSService{
|
service := GCSService{
|
||||||
Client: client,
|
Client: client,
|
||||||
Ctx: ctx,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
err = service.DeleteObject(GCSObjectParams{
|
err = service.DeleteObject(ctx, GCSObjectParams{
|
||||||
Bucket: "test-bucket",
|
Bucket: "test-bucket",
|
||||||
ID: "test-name",
|
ID: "test-name",
|
||||||
})
|
})
|
||||||
|
@ -376,12 +369,11 @@ func TestWriteObject(t *testing.T) {
|
||||||
|
|
||||||
service := GCSService{
|
service := GCSService{
|
||||||
Client: client,
|
Client: client,
|
||||||
Ctx: ctx,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
reader := bytes.NewReader([]byte{1})
|
reader := bytes.NewReader([]byte{1})
|
||||||
|
|
||||||
size, err := service.WriteObject(GCSObjectParams{
|
size, err := service.WriteObject(ctx, GCSObjectParams{
|
||||||
Bucket: "test-bucket",
|
Bucket: "test-bucket",
|
||||||
ID: "test-name",
|
ID: "test-name",
|
||||||
}, reader)
|
}, reader)
|
||||||
|
@ -428,10 +420,9 @@ func TestComposeFrom(t *testing.T) {
|
||||||
|
|
||||||
service := GCSService{
|
service := GCSService{
|
||||||
Client: client,
|
Client: client,
|
||||||
Ctx: ctx,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
crc, err := service.ComposeFrom([]*storage.ObjectHandle{client.Bucket("test-bucket").Object("my-object")}, GCSObjectParams{
|
crc, err := service.ComposeFrom(ctx, []*storage.ObjectHandle{client.Bucket("test-bucket").Object("my-object")}, GCSObjectParams{
|
||||||
Bucket: "test-bucket",
|
Bucket: "test-bucket",
|
||||||
ID: "my-object",
|
ID: "my-object",
|
||||||
}, "text")
|
}, "text")
|
||||||
|
@ -478,10 +469,9 @@ func TestFilterObject(t *testing.T) {
|
||||||
|
|
||||||
service := GCSService{
|
service := GCSService{
|
||||||
Client: client,
|
Client: client,
|
||||||
Ctx: ctx,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
objects, err := service.FilterObjects(GCSFilterParams{
|
objects, err := service.FilterObjects(ctx, GCSFilterParams{
|
||||||
Bucket: "test-bucket",
|
Bucket: "test-bucket",
|
||||||
Prefix: "test-prefix",
|
Prefix: "test-prefix",
|
||||||
})
|
})
|
||||||
|
|
|
@ -14,9 +14,12 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"golang.org/x/net/context"
|
||||||
"io"
|
"io"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
"cloud.google.com/go/storage"
|
"cloud.google.com/go/storage"
|
||||||
"github.com/tus/tusd"
|
"github.com/tus/tusd"
|
||||||
|
@ -55,7 +58,8 @@ func (store GCSStore) NewUpload(info tusd.FileInfo) (id string, err error) {
|
||||||
info.ID = uid.Uid()
|
info.ID = uid.Uid()
|
||||||
}
|
}
|
||||||
|
|
||||||
err = store.writeInfo(info.ID, info)
|
ctx := context.Background()
|
||||||
|
err = store.writeInfo(ctx, info.ID, info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return info.ID, err
|
return info.ID, err
|
||||||
}
|
}
|
||||||
|
@ -70,7 +74,8 @@ func (store GCSStore) WriteChunk(id string, offset int64, src io.Reader) (int64,
|
||||||
Prefix: prefix,
|
Prefix: prefix,
|
||||||
}
|
}
|
||||||
|
|
||||||
names, err := store.Service.FilterObjects(filterParams)
|
ctx := context.Background()
|
||||||
|
names, err := store.Service.FilterObjects(ctx, filterParams)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
@ -95,7 +100,7 @@ func (store GCSStore) WriteChunk(id string, offset int64, src io.Reader) (int64,
|
||||||
ID: cid,
|
ID: cid,
|
||||||
}
|
}
|
||||||
|
|
||||||
n, err := store.Service.WriteObject(objectParams, src)
|
n, err := store.Service.WriteObject(ctx, objectParams, src)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
@ -103,6 +108,8 @@ func (store GCSStore) WriteChunk(id string, offset int64, src io.Reader) (int64,
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const CONCURRENT_SIZE_REQUESTS = 32
|
||||||
|
|
||||||
func (store GCSStore) GetInfo(id string) (tusd.FileInfo, error) {
|
func (store GCSStore) GetInfo(id string) (tusd.FileInfo, error) {
|
||||||
info := tusd.FileInfo{}
|
info := tusd.FileInfo{}
|
||||||
i := fmt.Sprintf("%s.info", id)
|
i := fmt.Sprintf("%s.info", id)
|
||||||
|
@ -112,7 +119,8 @@ func (store GCSStore) GetInfo(id string) (tusd.FileInfo, error) {
|
||||||
ID: i,
|
ID: i,
|
||||||
}
|
}
|
||||||
|
|
||||||
r, err := store.Service.ReadObject(params)
|
ctx := context.Background()
|
||||||
|
r, err := store.Service.ReadObject(ctx, params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == storage.ErrObjectNotExist {
|
if err == storage.ErrObjectNotExist {
|
||||||
return info, tusd.ErrNotFound
|
return info, tusd.ErrNotFound
|
||||||
|
@ -136,37 +144,71 @@ func (store GCSStore) GetInfo(id string) (tusd.FileInfo, error) {
|
||||||
Prefix: prefix,
|
Prefix: prefix,
|
||||||
}
|
}
|
||||||
|
|
||||||
names, err := store.Service.FilterObjects(filterParams)
|
names, err := store.Service.FilterObjects(ctx, filterParams)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return info, err
|
return info, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var offset int64 = 0
|
var offset int64 = 0
|
||||||
|
var firstError error = nil
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
sem := make(chan struct{}, CONCURRENT_SIZE_REQUESTS)
|
||||||
|
errChan := make(chan error)
|
||||||
|
ctxCancel, cancel := context.WithCancel(ctx)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for err := range errChan {
|
||||||
|
if err != context.Canceled && firstError == nil {
|
||||||
|
firstError = err
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
for _, name := range names {
|
for _, name := range names {
|
||||||
|
sem <- struct{}{}
|
||||||
|
wg.Add(1)
|
||||||
params = GCSObjectParams{
|
params = GCSObjectParams{
|
||||||
Bucket: store.Bucket,
|
Bucket: store.Bucket,
|
||||||
ID: name,
|
ID: name,
|
||||||
}
|
}
|
||||||
|
|
||||||
size, err := store.Service.GetObjectSize(params)
|
go func(params GCSObjectParams) {
|
||||||
|
defer func() {
|
||||||
|
<-sem
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
|
||||||
|
size, err := store.Service.GetObjectSize(ctxCancel, params)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return info, err
|
errChan <- err
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
offset += size
|
atomic.AddInt64(&offset, size)
|
||||||
|
}(params)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
close(errChan)
|
||||||
|
|
||||||
|
if firstError != nil {
|
||||||
|
return info, firstError
|
||||||
}
|
}
|
||||||
|
|
||||||
info.Offset = offset
|
info.Offset = offset
|
||||||
err = store.writeInfo(id, info)
|
err = store.writeInfo(ctx, id, info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return info, err
|
return info, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return info, nil
|
return info, nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (store GCSStore) writeInfo(id string, info tusd.FileInfo) error {
|
func (store GCSStore) writeInfo(ctx context.Context, id string, info tusd.FileInfo) error {
|
||||||
data, err := json.Marshal(info)
|
data, err := json.Marshal(info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -180,7 +222,7 @@ func (store GCSStore) writeInfo(id string, info tusd.FileInfo) error {
|
||||||
ID: i,
|
ID: i,
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = store.Service.WriteObject(params, r)
|
_, err = store.Service.WriteObject(ctx, params, r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -195,7 +237,8 @@ func (store GCSStore) FinishUpload(id string) error {
|
||||||
Prefix: prefix,
|
Prefix: prefix,
|
||||||
}
|
}
|
||||||
|
|
||||||
names, err := store.Service.FilterObjects(filterParams)
|
ctx := context.Background()
|
||||||
|
names, err := store.Service.FilterObjects(ctx, filterParams)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -206,12 +249,12 @@ func (store GCSStore) FinishUpload(id string) error {
|
||||||
Sources: names,
|
Sources: names,
|
||||||
}
|
}
|
||||||
|
|
||||||
err = store.Service.ComposeObjects(composeParams)
|
err = store.Service.ComposeObjects(ctx, composeParams)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = store.Service.DeleteObjectsWithFilter(filterParams)
|
err = store.Service.DeleteObjectsWithFilter(ctx, filterParams)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -226,7 +269,7 @@ func (store GCSStore) FinishUpload(id string) error {
|
||||||
ID: id,
|
ID: id,
|
||||||
}
|
}
|
||||||
|
|
||||||
err = store.Service.SetObjectMetadata(objectParams, info.MetaData)
|
err = store.Service.SetObjectMetadata(ctx, objectParams, info.MetaData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -240,7 +283,8 @@ func (store GCSStore) Terminate(id string) error {
|
||||||
Prefix: id,
|
Prefix: id,
|
||||||
}
|
}
|
||||||
|
|
||||||
err := store.Service.DeleteObjectsWithFilter(filterParams)
|
ctx := context.Background()
|
||||||
|
err := store.Service.DeleteObjectsWithFilter(ctx, filterParams)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -254,7 +298,8 @@ func (store GCSStore) GetReader(id string) (io.Reader, error) {
|
||||||
ID: id,
|
ID: id,
|
||||||
}
|
}
|
||||||
|
|
||||||
r, err := store.Service.ReadObject(params)
|
ctx := context.Background()
|
||||||
|
r, err := store.Service.ReadObject(ctx, params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,6 +4,7 @@
|
||||||
package gcsstore_test
|
package gcsstore_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
context "context"
|
||||||
gomock "github.com/golang/mock/gomock"
|
gomock "github.com/golang/mock/gomock"
|
||||||
gcsstore "github.com/tus/tusd/gcsstore"
|
gcsstore "github.com/tus/tusd/gcsstore"
|
||||||
io "io"
|
io "io"
|
||||||
|
@ -102,86 +103,86 @@ func (_m *MockGCSAPI) EXPECT() *_MockGCSAPIRecorder {
|
||||||
return _m.recorder
|
return _m.recorder
|
||||||
}
|
}
|
||||||
|
|
||||||
func (_m *MockGCSAPI) ComposeObjects(_param0 gcsstore.GCSComposeParams) error {
|
func (_m *MockGCSAPI) ComposeObjects(_param0 context.Context, _param1 gcsstore.GCSComposeParams) error {
|
||||||
ret := _m.ctrl.Call(_m, "ComposeObjects", _param0)
|
ret := _m.ctrl.Call(_m, "ComposeObjects", _param0, _param1)
|
||||||
ret0, _ := ret[0].(error)
|
ret0, _ := ret[0].(error)
|
||||||
return ret0
|
return ret0
|
||||||
}
|
}
|
||||||
|
|
||||||
func (_mr *_MockGCSAPIRecorder) ComposeObjects(arg0 interface{}) *gomock.Call {
|
func (_mr *_MockGCSAPIRecorder) ComposeObjects(arg0, arg1 interface{}) *gomock.Call {
|
||||||
return _mr.mock.ctrl.RecordCall(_mr.mock, "ComposeObjects", arg0)
|
return _mr.mock.ctrl.RecordCall(_mr.mock, "ComposeObjects", arg0, arg1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (_m *MockGCSAPI) DeleteObject(_param0 gcsstore.GCSObjectParams) error {
|
func (_m *MockGCSAPI) DeleteObject(_param0 context.Context, _param1 gcsstore.GCSObjectParams) error {
|
||||||
ret := _m.ctrl.Call(_m, "DeleteObject", _param0)
|
ret := _m.ctrl.Call(_m, "DeleteObject", _param0, _param1)
|
||||||
ret0, _ := ret[0].(error)
|
ret0, _ := ret[0].(error)
|
||||||
return ret0
|
return ret0
|
||||||
}
|
}
|
||||||
|
|
||||||
func (_mr *_MockGCSAPIRecorder) DeleteObject(arg0 interface{}) *gomock.Call {
|
func (_mr *_MockGCSAPIRecorder) DeleteObject(arg0, arg1 interface{}) *gomock.Call {
|
||||||
return _mr.mock.ctrl.RecordCall(_mr.mock, "DeleteObject", arg0)
|
return _mr.mock.ctrl.RecordCall(_mr.mock, "DeleteObject", arg0, arg1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (_m *MockGCSAPI) DeleteObjectsWithFilter(_param0 gcsstore.GCSFilterParams) error {
|
func (_m *MockGCSAPI) DeleteObjectsWithFilter(_param0 context.Context, _param1 gcsstore.GCSFilterParams) error {
|
||||||
ret := _m.ctrl.Call(_m, "DeleteObjectsWithFilter", _param0)
|
ret := _m.ctrl.Call(_m, "DeleteObjectsWithFilter", _param0, _param1)
|
||||||
ret0, _ := ret[0].(error)
|
ret0, _ := ret[0].(error)
|
||||||
return ret0
|
return ret0
|
||||||
}
|
}
|
||||||
|
|
||||||
func (_mr *_MockGCSAPIRecorder) DeleteObjectsWithFilter(arg0 interface{}) *gomock.Call {
|
func (_mr *_MockGCSAPIRecorder) DeleteObjectsWithFilter(arg0, arg1 interface{}) *gomock.Call {
|
||||||
return _mr.mock.ctrl.RecordCall(_mr.mock, "DeleteObjectsWithFilter", arg0)
|
return _mr.mock.ctrl.RecordCall(_mr.mock, "DeleteObjectsWithFilter", arg0, arg1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (_m *MockGCSAPI) FilterObjects(_param0 gcsstore.GCSFilterParams) ([]string, error) {
|
func (_m *MockGCSAPI) FilterObjects(_param0 context.Context, _param1 gcsstore.GCSFilterParams) ([]string, error) {
|
||||||
ret := _m.ctrl.Call(_m, "FilterObjects", _param0)
|
ret := _m.ctrl.Call(_m, "FilterObjects", _param0, _param1)
|
||||||
ret0, _ := ret[0].([]string)
|
ret0, _ := ret[0].([]string)
|
||||||
ret1, _ := ret[1].(error)
|
ret1, _ := ret[1].(error)
|
||||||
return ret0, ret1
|
return ret0, ret1
|
||||||
}
|
}
|
||||||
|
|
||||||
func (_mr *_MockGCSAPIRecorder) FilterObjects(arg0 interface{}) *gomock.Call {
|
func (_mr *_MockGCSAPIRecorder) FilterObjects(arg0, arg1 interface{}) *gomock.Call {
|
||||||
return _mr.mock.ctrl.RecordCall(_mr.mock, "FilterObjects", arg0)
|
return _mr.mock.ctrl.RecordCall(_mr.mock, "FilterObjects", arg0, arg1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (_m *MockGCSAPI) GetObjectSize(_param0 gcsstore.GCSObjectParams) (int64, error) {
|
func (_m *MockGCSAPI) GetObjectSize(_param0 context.Context, _param1 gcsstore.GCSObjectParams) (int64, error) {
|
||||||
ret := _m.ctrl.Call(_m, "GetObjectSize", _param0)
|
ret := _m.ctrl.Call(_m, "GetObjectSize", _param0, _param1)
|
||||||
ret0, _ := ret[0].(int64)
|
ret0, _ := ret[0].(int64)
|
||||||
ret1, _ := ret[1].(error)
|
ret1, _ := ret[1].(error)
|
||||||
return ret0, ret1
|
return ret0, ret1
|
||||||
}
|
}
|
||||||
|
|
||||||
func (_mr *_MockGCSAPIRecorder) GetObjectSize(arg0 interface{}) *gomock.Call {
|
func (_mr *_MockGCSAPIRecorder) GetObjectSize(arg0, arg1 interface{}) *gomock.Call {
|
||||||
return _mr.mock.ctrl.RecordCall(_mr.mock, "GetObjectSize", arg0)
|
return _mr.mock.ctrl.RecordCall(_mr.mock, "GetObjectSize", arg0, arg1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (_m *MockGCSAPI) ReadObject(_param0 gcsstore.GCSObjectParams) (gcsstore.GCSReader, error) {
|
func (_m *MockGCSAPI) ReadObject(_param0 context.Context, _param1 gcsstore.GCSObjectParams) (gcsstore.GCSReader, error) {
|
||||||
ret := _m.ctrl.Call(_m, "ReadObject", _param0)
|
ret := _m.ctrl.Call(_m, "ReadObject", _param0, _param1)
|
||||||
ret0, _ := ret[0].(gcsstore.GCSReader)
|
ret0, _ := ret[0].(gcsstore.GCSReader)
|
||||||
ret1, _ := ret[1].(error)
|
ret1, _ := ret[1].(error)
|
||||||
return ret0, ret1
|
return ret0, ret1
|
||||||
}
|
}
|
||||||
|
|
||||||
func (_mr *_MockGCSAPIRecorder) ReadObject(arg0 interface{}) *gomock.Call {
|
func (_mr *_MockGCSAPIRecorder) ReadObject(arg0, arg1 interface{}) *gomock.Call {
|
||||||
return _mr.mock.ctrl.RecordCall(_mr.mock, "ReadObject", arg0)
|
return _mr.mock.ctrl.RecordCall(_mr.mock, "ReadObject", arg0, arg1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (_m *MockGCSAPI) SetObjectMetadata(_param0 gcsstore.GCSObjectParams, _param1 map[string]string) error {
|
func (_m *MockGCSAPI) SetObjectMetadata(_param0 context.Context, _param1 gcsstore.GCSObjectParams, _param2 map[string]string) error {
|
||||||
ret := _m.ctrl.Call(_m, "SetObjectMetadata", _param0, _param1)
|
ret := _m.ctrl.Call(_m, "SetObjectMetadata", _param0, _param1, _param2)
|
||||||
ret0, _ := ret[0].(error)
|
ret0, _ := ret[0].(error)
|
||||||
return ret0
|
return ret0
|
||||||
}
|
}
|
||||||
|
|
||||||
func (_mr *_MockGCSAPIRecorder) SetObjectMetadata(arg0, arg1 interface{}) *gomock.Call {
|
func (_mr *_MockGCSAPIRecorder) SetObjectMetadata(arg0, arg1, arg2 interface{}) *gomock.Call {
|
||||||
return _mr.mock.ctrl.RecordCall(_mr.mock, "SetObjectMetadata", arg0, arg1)
|
return _mr.mock.ctrl.RecordCall(_mr.mock, "SetObjectMetadata", arg0, arg1, arg2)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (_m *MockGCSAPI) WriteObject(_param0 gcsstore.GCSObjectParams, _param1 io.Reader) (int64, error) {
|
func (_m *MockGCSAPI) WriteObject(_param0 context.Context, _param1 gcsstore.GCSObjectParams, _param2 io.Reader) (int64, error) {
|
||||||
ret := _m.ctrl.Call(_m, "WriteObject", _param0, _param1)
|
ret := _m.ctrl.Call(_m, "WriteObject", _param0, _param1, _param2)
|
||||||
ret0, _ := ret[0].(int64)
|
ret0, _ := ret[0].(int64)
|
||||||
ret1, _ := ret[1].(error)
|
ret1, _ := ret[1].(error)
|
||||||
return ret0, ret1
|
return ret0, ret1
|
||||||
}
|
}
|
||||||
|
|
||||||
func (_mr *_MockGCSAPIRecorder) WriteObject(arg0, arg1 interface{}) *gomock.Call {
|
func (_mr *_MockGCSAPIRecorder) WriteObject(arg0, arg1, arg2 interface{}) *gomock.Call {
|
||||||
return _mr.mock.ctrl.RecordCall(_mr.mock, "WriteObject", arg0, arg1)
|
return _mr.mock.ctrl.RecordCall(_mr.mock, "WriteObject", arg0, arg1, arg2)
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"golang.org/x/net/context"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"cloud.google.com/go/storage"
|
"cloud.google.com/go/storage"
|
||||||
|
@ -55,7 +56,8 @@ func TestNewUpload(t *testing.T) {
|
||||||
ID: fmt.Sprintf("%s.info", mockID),
|
ID: fmt.Sprintf("%s.info", mockID),
|
||||||
}
|
}
|
||||||
|
|
||||||
service.EXPECT().WriteObject(params, r).Return(int64(r.Len()), nil)
|
ctx := context.Background()
|
||||||
|
service.EXPECT().WriteObject(ctx, params, r).Return(int64(r.Len()), nil)
|
||||||
|
|
||||||
id, err := store.NewUpload(mockTusdInfo)
|
id, err := store.NewUpload(mockTusdInfo)
|
||||||
assert.Nil(err)
|
assert.Nil(err)
|
||||||
|
@ -130,15 +132,19 @@ func TestGetInfo(t *testing.T) {
|
||||||
|
|
||||||
infoR := bytes.NewReader(offsetInfoData)
|
infoR := bytes.NewReader(offsetInfoData)
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
service.EXPECT().ReadObject(params).Return(r, nil),
|
service.EXPECT().ReadObject(ctx, params).Return(r, nil),
|
||||||
service.EXPECT().FilterObjects(filterParams).Return(mockPartials, nil),
|
service.EXPECT().FilterObjects(ctx, filterParams).Return(mockPartials, nil),
|
||||||
service.EXPECT().GetObjectSize(mockObjectParams0).Return(size, nil),
|
|
||||||
service.EXPECT().GetObjectSize(mockObjectParams1).Return(size, nil),
|
|
||||||
service.EXPECT().GetObjectSize(mockObjectParams2).Return(size, nil),
|
|
||||||
service.EXPECT().WriteObject(params, infoR).Return(int64(len(offsetInfoData)), nil),
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
ctxCancel, _ := context.WithCancel(ctx)
|
||||||
|
service.EXPECT().GetObjectSize(ctxCancel, mockObjectParams0).Return(size, nil)
|
||||||
|
service.EXPECT().GetObjectSize(ctxCancel, mockObjectParams1).Return(size, nil)
|
||||||
|
lastGetObjectSize := service.EXPECT().GetObjectSize(ctxCancel, mockObjectParams2).Return(size, nil)
|
||||||
|
|
||||||
|
service.EXPECT().WriteObject(ctx, params, infoR).Return(int64(len(offsetInfoData)), nil).After(lastGetObjectSize)
|
||||||
|
|
||||||
info, err := store.GetInfo(mockID)
|
info, err := store.GetInfo(mockID)
|
||||||
assert.Nil(err)
|
assert.Nil(err)
|
||||||
assert.Equal(mockTusdInfo, info)
|
assert.Equal(mockTusdInfo, info)
|
||||||
|
@ -157,8 +163,9 @@ func TestGetInfoNotFound(t *testing.T) {
|
||||||
ID: fmt.Sprintf("%s.info", mockID),
|
ID: fmt.Sprintf("%s.info", mockID),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
service.EXPECT().ReadObject(params).Return(nil, storage.ErrObjectNotExist),
|
service.EXPECT().ReadObject(ctx, params).Return(nil, storage.ErrObjectNotExist),
|
||||||
)
|
)
|
||||||
|
|
||||||
_, err := store.GetInfo(mockID)
|
_, err := store.GetInfo(mockID)
|
||||||
|
@ -205,7 +212,8 @@ func TestGetReader(t *testing.T) {
|
||||||
|
|
||||||
r := MockGetReader{}
|
r := MockGetReader{}
|
||||||
|
|
||||||
service.EXPECT().ReadObject(params).Return(r, nil)
|
ctx := context.Background()
|
||||||
|
service.EXPECT().ReadObject(ctx, params).Return(r, nil)
|
||||||
reader, err := store.GetReader(mockID)
|
reader, err := store.GetReader(mockID)
|
||||||
assert.Nil(err)
|
assert.Nil(err)
|
||||||
|
|
||||||
|
@ -231,7 +239,8 @@ func TestTerminate(t *testing.T) {
|
||||||
Prefix: mockID,
|
Prefix: mockID,
|
||||||
}
|
}
|
||||||
|
|
||||||
service.EXPECT().DeleteObjectsWithFilter(filterParams).Return(nil)
|
ctx := context.Background()
|
||||||
|
service.EXPECT().DeleteObjectsWithFilter(ctx, filterParams).Return(nil)
|
||||||
|
|
||||||
err := store.Terminate(mockID)
|
err := store.Terminate(mockID)
|
||||||
assert.Nil(err)
|
assert.Nil(err)
|
||||||
|
@ -302,19 +311,23 @@ func TestFinishUpload(t *testing.T) {
|
||||||
"foo": "bar",
|
"foo": "bar",
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
service.EXPECT().FilterObjects(filterParams).Return(mockPartials, nil),
|
service.EXPECT().FilterObjects(ctx, filterParams).Return(mockPartials, nil),
|
||||||
service.EXPECT().ComposeObjects(composeParams).Return(nil),
|
service.EXPECT().ComposeObjects(ctx, composeParams).Return(nil),
|
||||||
service.EXPECT().DeleteObjectsWithFilter(filterParams).Return(nil),
|
service.EXPECT().DeleteObjectsWithFilter(ctx, filterParams).Return(nil),
|
||||||
service.EXPECT().ReadObject(infoParams).Return(r, nil),
|
service.EXPECT().ReadObject(ctx, infoParams).Return(r, nil),
|
||||||
service.EXPECT().FilterObjects(filterParams2).Return(mockPartials, nil),
|
service.EXPECT().FilterObjects(ctx, filterParams2).Return(mockPartials, nil),
|
||||||
service.EXPECT().GetObjectSize(mockObjectParams0).Return(size, nil),
|
|
||||||
service.EXPECT().GetObjectSize(mockObjectParams1).Return(size, nil),
|
|
||||||
service.EXPECT().GetObjectSize(mockObjectParams2).Return(size, nil),
|
|
||||||
service.EXPECT().WriteObject(infoParams, infoR).Return(int64(len(offsetInfoData)), nil),
|
|
||||||
service.EXPECT().SetObjectMetadata(objectParams, metadata).Return(nil),
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
ctxCancel, _ := context.WithCancel(ctx)
|
||||||
|
service.EXPECT().GetObjectSize(ctxCancel, mockObjectParams0).Return(size, nil)
|
||||||
|
service.EXPECT().GetObjectSize(ctxCancel, mockObjectParams1).Return(size, nil)
|
||||||
|
lastGetObjectSize := service.EXPECT().GetObjectSize(ctxCancel, mockObjectParams2).Return(size, nil)
|
||||||
|
|
||||||
|
writeObject := service.EXPECT().WriteObject(ctx, infoParams, infoR).Return(int64(len(offsetInfoData)), nil).After(lastGetObjectSize)
|
||||||
|
service.EXPECT().SetObjectMetadata(ctx, objectParams, metadata).Return(nil).After(writeObject)
|
||||||
|
|
||||||
err = store.FinishUpload(mockID)
|
err = store.FinishUpload(mockID)
|
||||||
assert.Nil(err)
|
assert.Nil(err)
|
||||||
}
|
}
|
||||||
|
@ -378,9 +391,10 @@ func TestWriteChunk(t *testing.T) {
|
||||||
|
|
||||||
rGet := bytes.NewReader([]byte(mockReaderData))
|
rGet := bytes.NewReader([]byte(mockReaderData))
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
service.EXPECT().FilterObjects(filterParams).Return(partials, nil),
|
service.EXPECT().FilterObjects(ctx, filterParams).Return(partials, nil),
|
||||||
service.EXPECT().WriteObject(writeObjectParams, rGet).Return(int64(len(mockReaderData)), nil),
|
service.EXPECT().WriteObject(ctx, writeObjectParams, rGet).Return(int64(len(mockReaderData)), nil),
|
||||||
)
|
)
|
||||||
|
|
||||||
reader := bytes.NewReader([]byte(mockReaderData))
|
reader := bytes.NewReader([]byte(mockReaderData))
|
||||||
|
|
Loading…
Reference in New Issue