Pass contexts to GCSService. Add support for concurrent GetObjectSize requests in GetInfo

This commit is contained in:
Tom Berger 2018-07-25 10:25:59 -04:00
parent 93d83ebdd8
commit a1273df6cd
2 changed files with 105 additions and 62 deletions

View File

@ -59,14 +59,14 @@ type GCSReader interface {
// operations that are required to enable the tus protocol // operations that are required to enable the tus protocol
// to work with Google's cloud storage. // to work with Google's cloud storage.
type GCSAPI interface { type GCSAPI interface {
ReadObject(params GCSObjectParams) (GCSReader, error) ReadObject(ctx context.Context, params GCSObjectParams) (GCSReader, error)
GetObjectSize(params GCSObjectParams) (int64, error) GetObjectSize(ctx context.Context, params GCSObjectParams) (int64, error)
SetObjectMetadata(params GCSObjectParams, metadata map[string]string) error SetObjectMetadata(ctx context.Context, params GCSObjectParams, metadata map[string]string) error
DeleteObject(params GCSObjectParams) error DeleteObject(ctx context.Context, params GCSObjectParams) error
DeleteObjectsWithFilter(params GCSFilterParams) error DeleteObjectsWithFilter(ctx context.Context, params GCSFilterParams) error
WriteObject(params GCSObjectParams, r io.Reader) (int64, error) WriteObject(ctx context.Context, params GCSObjectParams, r io.Reader) (int64, error)
ComposeObjects(params GCSComposeParams) error ComposeObjects(ctx context.Context, params GCSComposeParams) error
FilterObjects(params GCSFilterParams) ([]string, error) FilterObjects(ctx context.Context, params GCSFilterParams) ([]string, error)
} }
// GCSService holds the cloud.google.com/go/storage client // GCSService holds the cloud.google.com/go/storage client
@ -75,7 +75,6 @@ type GCSAPI interface {
// The usage of these closures allow them to be redefined in the testing package, allowing test to be run against this file. // The usage of these closures allow them to be redefined in the testing package, allowing test to be run against this file.
type GCSService struct { type GCSService struct {
Client *storage.Client Client *storage.Client
Ctx context.Context
} }
// NewGCSService returns a GCSSerivce object given a GCloud service account file path. // NewGCSService returns a GCSSerivce object given a GCloud service account file path.
@ -88,15 +87,14 @@ func NewGCSService(filename string) (*GCSService, error) {
service := &GCSService{ service := &GCSService{
Client: client, Client: client,
Ctx: ctx,
} }
return service, nil return service, nil
} }
// GetObjectSize returns the byte length of the specified GCS object. // GetObjectSize returns the byte length of the specified GCS object.
func (service *GCSService) GetObjectSize(params GCSObjectParams) (int64, error) { func (service *GCSService) GetObjectSize(ctx context.Context, params GCSObjectParams) (int64, error) {
attrs, err := service.GetObjectAttrs(params) attrs, err := service.GetObjectAttrs(ctx, params)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -105,8 +103,8 @@ func (service *GCSService) GetObjectSize(params GCSObjectParams) (int64, error)
} }
// DeleteObjectWithPrefix will delete objects who match the provided filter parameters. // DeleteObjectWithPrefix will delete objects who match the provided filter parameters.
func (service *GCSService) DeleteObjectsWithFilter(params GCSFilterParams) error { func (service *GCSService) DeleteObjectsWithFilter(ctx context.Context, params GCSFilterParams) error {
names, err := service.FilterObjects(params) names, err := service.FilterObjects(ctx, params)
if err != nil { if err != nil {
return err return err
} }
@ -118,7 +116,7 @@ func (service *GCSService) DeleteObjectsWithFilter(params GCSFilterParams) error
ID: name, ID: name,
} }
err := service.DeleteObject(objectParams) err := service.DeleteObject(ctx, objectParams)
if err != nil { if err != nil {
return err return err
} }
@ -130,7 +128,7 @@ func (service *GCSService) DeleteObjectsWithFilter(params GCSFilterParams) error
const COMPOSE_RETRIES = 3 const COMPOSE_RETRIES = 3
// Compose takes a bucket name, a list of initial source names, and a destination string to compose multiple GCS objects together // Compose takes a bucket name, a list of initial source names, and a destination string to compose multiple GCS objects together
func (service *GCSService) compose(bucket string, srcs []string, dst string) error { func (service *GCSService) compose(ctx context.Context, bucket string, srcs []string, dst string) error {
dstParams := GCSObjectParams{ dstParams := GCSObjectParams{
Bucket: bucket, Bucket: bucket,
ID: dst, ID: dst,
@ -139,7 +137,7 @@ func (service *GCSService) compose(bucket string, srcs []string, dst string) err
var crc uint32 var crc uint32
for i := 0; i < len(srcs); i++ { for i := 0; i < len(srcs); i++ {
objSrcs[i] = service.Client.Bucket(bucket).Object(srcs[i]) objSrcs[i] = service.Client.Bucket(bucket).Object(srcs[i])
srcAttrs, err := service.GetObjectAttrs(GCSObjectParams{ srcAttrs, err := service.GetObjectAttrs(ctx, GCSObjectParams{
Bucket: bucket, Bucket: bucket,
ID: srcs[i], ID: srcs[i],
}) })
@ -154,7 +152,7 @@ func (service *GCSService) compose(bucket string, srcs []string, dst string) err
} }
} }
attrs, err := service.GetObjectAttrs(GCSObjectParams{ attrs, err := service.GetObjectAttrs(ctx, GCSObjectParams{
Bucket: bucket, Bucket: bucket,
ID: srcs[0], ID: srcs[0],
}) })
@ -163,7 +161,7 @@ func (service *GCSService) compose(bucket string, srcs []string, dst string) err
} }
for i := 0; i < COMPOSE_RETRIES; i++ { for i := 0; i < COMPOSE_RETRIES; i++ {
dstCRC, err := service.ComposeFrom(objSrcs, dstParams, attrs.ContentType) dstCRC, err := service.ComposeFrom(ctx, objSrcs, dstParams, attrs.ContentType)
if err != nil { if err != nil {
return err return err
} }
@ -173,7 +171,7 @@ func (service *GCSService) compose(bucket string, srcs []string, dst string) err
} }
} }
err = service.DeleteObject(GCSObjectParams{ err = service.DeleteObject(ctx, GCSObjectParams{
Bucket: bucket, Bucket: bucket,
ID: dst, ID: dst,
}) })
@ -190,9 +188,9 @@ func (service *GCSService) compose(bucket string, srcs []string, dst string) err
// can combined in a compose operation. GCloud storage's limit is 32. // can combined in a compose operation. GCloud storage's limit is 32.
const MAX_OBJECT_COMPOSITION = 32 const MAX_OBJECT_COMPOSITION = 32
func (service *GCSService) recursiveCompose(srcs []string, params GCSComposeParams, lvl int) error { func (service *GCSService) recursiveCompose(ctx context.Context, srcs []string, params GCSComposeParams, lvl int) error {
if len(srcs) <= MAX_OBJECT_COMPOSITION { if len(srcs) <= MAX_OBJECT_COMPOSITION {
err := service.compose(params.Bucket, srcs, params.Destination) err := service.compose(ctx, params.Bucket, srcs, params.Destination)
if err != nil { if err != nil {
return err return err
} }
@ -204,7 +202,7 @@ func (service *GCSService) recursiveCompose(srcs []string, params GCSComposePara
Prefix: prefix, Prefix: prefix,
} }
err = service.DeleteObjectsWithFilter(filterParams) err = service.DeleteObjectsWithFilter(ctx, filterParams)
if err != nil { if err != nil {
return err return err
} }
@ -223,7 +221,7 @@ func (service *GCSService) recursiveCompose(srcs []string, params GCSComposePara
} }
tmpDst := fmt.Sprintf("%s_tmp_%d_%d", params.Destination, lvl, i) tmpDst := fmt.Sprintf("%s_tmp_%d_%d", params.Destination, lvl, i)
err := service.compose(params.Bucket, srcs[start:end], tmpDst) err := service.compose(ctx, params.Bucket, srcs[start:end], tmpDst)
if err != nil { if err != nil {
return err return err
} }
@ -231,15 +229,15 @@ func (service *GCSService) recursiveCompose(srcs []string, params GCSComposePara
tmpSrcs[i] = tmpDst tmpSrcs[i] = tmpDst
} }
return service.recursiveCompose(tmpSrcs, params, lvl+1) return service.recursiveCompose(ctx, tmpSrcs, params, lvl+1)
} }
// ComposeObjects composes multiple GCS objects in to a single object. // ComposeObjects composes multiple GCS objects in to a single object.
// Since GCS limits composition to a max of 32 objects, additional logic // Since GCS limits composition to a max of 32 objects, additional logic
// has been added to chunk objects in to groups of 32 and then recursively // has been added to chunk objects in to groups of 32 and then recursively
// compose those objects together. // compose those objects together.
func (service *GCSService) ComposeObjects(params GCSComposeParams) error { func (service *GCSService) ComposeObjects(ctx context.Context, params GCSComposeParams) error {
err := service.recursiveCompose(params.Sources, params, 0) err := service.recursiveCompose(ctx, params.Sources, params, 0)
if err != nil { if err != nil {
return err return err
@ -249,10 +247,10 @@ func (service *GCSService) ComposeObjects(params GCSComposeParams) error {
} }
// GetObjectAttrs returns the associated attributes of a GCS object. See: https://godoc.org/cloud.google.com/go/storage#ObjectAttrs // GetObjectAttrs returns the associated attributes of a GCS object. See: https://godoc.org/cloud.google.com/go/storage#ObjectAttrs
func (service *GCSService) GetObjectAttrs(params GCSObjectParams) (*storage.ObjectAttrs, error) { func (service *GCSService) GetObjectAttrs(ctx context.Context, params GCSObjectParams) (*storage.ObjectAttrs, error) {
obj := service.Client.Bucket(params.Bucket).Object(params.ID) obj := service.Client.Bucket(params.Bucket).Object(params.ID)
attrs, err := obj.Attrs(service.Ctx) attrs, err := obj.Attrs(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -262,8 +260,8 @@ func (service *GCSService) GetObjectAttrs(params GCSObjectParams) (*storage.Obje
} }
// ReadObject reads a GCSObjectParams, returning a GCSReader object if successful, and an error otherwise // ReadObject reads a GCSObjectParams, returning a GCSReader object if successful, and an error otherwise
func (service *GCSService) ReadObject(params GCSObjectParams) (GCSReader, error) { func (service *GCSService) ReadObject(ctx context.Context, params GCSObjectParams) (GCSReader, error) {
r, err := service.Client.Bucket(params.Bucket).Object(params.ID).NewReader(service.Ctx) r, err := service.Client.Bucket(params.Bucket).Object(params.ID).NewReader(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -272,25 +270,25 @@ func (service *GCSService) ReadObject(params GCSObjectParams) (GCSReader, error)
} }
// SetObjectMetadata reads a GCSObjectParams and a map of metedata, returning a nil on sucess and an error otherwise // SetObjectMetadata reads a GCSObjectParams and a map of metedata, returning a nil on sucess and an error otherwise
func (service *GCSService) SetObjectMetadata(params GCSObjectParams, metadata map[string]string) error { func (service *GCSService) SetObjectMetadata(ctx context.Context, params GCSObjectParams, metadata map[string]string) error {
attrs := storage.ObjectAttrsToUpdate{ attrs := storage.ObjectAttrsToUpdate{
Metadata: metadata, Metadata: metadata,
} }
_, err := service.Client.Bucket(params.Bucket).Object(params.ID).Update(service.Ctx, attrs) _, err := service.Client.Bucket(params.Bucket).Object(params.ID).Update(ctx, attrs)
return err return err
} }
// DeleteObject deletes the object defined by GCSObjectParams // DeleteObject deletes the object defined by GCSObjectParams
func (service *GCSService) DeleteObject(params GCSObjectParams) error { func (service *GCSService) DeleteObject(ctx context.Context, params GCSObjectParams) error {
return service.Client.Bucket(params.Bucket).Object(params.ID).Delete(service.Ctx) return service.Client.Bucket(params.Bucket).Object(params.ID).Delete(ctx)
} }
// Write object writes the file set out by the GCSObjectParams // Write object writes the file set out by the GCSObjectParams
func (service *GCSService) WriteObject(params GCSObjectParams, r io.Reader) (int64, error) { func (service *GCSService) WriteObject(ctx context.Context, params GCSObjectParams, r io.Reader) (int64, error) {
obj := service.Client.Bucket(params.Bucket).Object(params.ID) obj := service.Client.Bucket(params.Bucket).Object(params.ID)
w := obj.NewWriter(service.Ctx) w := obj.NewWriter(ctx)
defer w.Close() defer w.Close()
@ -303,16 +301,16 @@ func (service *GCSService) WriteObject(params GCSObjectParams, r io.Reader) (int
} }
// ComposeFrom composes multiple object types together, // ComposeFrom composes multiple object types together,
func (service *GCSService) ComposeFrom(objSrcs []*storage.ObjectHandle, dstParams GCSObjectParams, contentType string) (uint32, error) { func (service *GCSService) ComposeFrom(ctx context.Context, objSrcs []*storage.ObjectHandle, dstParams GCSObjectParams, contentType string) (uint32, error) {
dstObj := service.Client.Bucket(dstParams.Bucket).Object(dstParams.ID) dstObj := service.Client.Bucket(dstParams.Bucket).Object(dstParams.ID)
c := dstObj.ComposerFrom(objSrcs...) c := dstObj.ComposerFrom(objSrcs...)
c.ContentType = contentType c.ContentType = contentType
_, err := c.Run(service.Ctx) _, err := c.Run(ctx)
if err != nil { if err != nil {
return 0, err return 0, err
} }
dstAttrs, err := dstObj.Attrs(service.Ctx) dstAttrs, err := dstObj.Attrs(ctx)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -325,14 +323,14 @@ func (service *GCSService) ComposeFrom(objSrcs []*storage.ObjectHandle, dstParam
// is zero based. The format [uid]_tmp_[recursion_lvl]_[chunk_idx] can also be used to // is zero based. The format [uid]_tmp_[recursion_lvl]_[chunk_idx] can also be used to
// specify objects that have been composed in a recursive fashion. These different formats // specify objects that have been composed in a recursive fashion. These different formats
// are usedd to ensure that objects are composed in the correct order. // are usedd to ensure that objects are composed in the correct order.
func (service *GCSService) FilterObjects(params GCSFilterParams) ([]string, error) { func (service *GCSService) FilterObjects(ctx context.Context, params GCSFilterParams) ([]string, error) {
bkt := service.Client.Bucket(params.Bucket) bkt := service.Client.Bucket(params.Bucket)
q := storage.Query{ q := storage.Query{
Prefix: params.Prefix, Prefix: params.Prefix,
Versions: false, Versions: false,
} }
it := bkt.Objects(service.Ctx, &q) it := bkt.Objects(ctx, &q)
names := make([]string, 0) names := make([]string, 0)
loop: loop:
for { for {

View File

@ -14,9 +14,12 @@ import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"golang.org/x/net/context"
"io" "io"
"strconv" "strconv"
"strings" "strings"
"sync"
"sync/atomic"
"cloud.google.com/go/storage" "cloud.google.com/go/storage"
"github.com/tus/tusd" "github.com/tus/tusd"
@ -55,7 +58,8 @@ func (store GCSStore) NewUpload(info tusd.FileInfo) (id string, err error) {
info.ID = uid.Uid() info.ID = uid.Uid()
} }
err = store.writeInfo(info.ID, info) ctx := context.Background()
err = store.writeInfo(ctx, info.ID, info)
if err != nil { if err != nil {
return info.ID, err return info.ID, err
} }
@ -70,7 +74,8 @@ func (store GCSStore) WriteChunk(id string, offset int64, src io.Reader) (int64,
Prefix: prefix, Prefix: prefix,
} }
names, err := store.Service.FilterObjects(filterParams) ctx := context.Background()
names, err := store.Service.FilterObjects(ctx, filterParams)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -95,7 +100,7 @@ func (store GCSStore) WriteChunk(id string, offset int64, src io.Reader) (int64,
ID: cid, ID: cid,
} }
n, err := store.Service.WriteObject(objectParams, src) n, err := store.Service.WriteObject(ctx, objectParams, src)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -103,6 +108,8 @@ func (store GCSStore) WriteChunk(id string, offset int64, src io.Reader) (int64,
return n, err return n, err
} }
const CONCURRENT_SIZE_REQUESTS = 32
func (store GCSStore) GetInfo(id string) (tusd.FileInfo, error) { func (store GCSStore) GetInfo(id string) (tusd.FileInfo, error) {
info := tusd.FileInfo{} info := tusd.FileInfo{}
i := fmt.Sprintf("%s.info", id) i := fmt.Sprintf("%s.info", id)
@ -112,7 +119,10 @@ func (store GCSStore) GetInfo(id string) (tusd.FileInfo, error) {
ID: i, ID: i,
} }
r, err := store.Service.ReadObject(params) ctx, cancel := context.WithCancel(context.Background())
defer cancel()
r, err := store.Service.ReadObject(ctx, params)
if err != nil { if err != nil {
if err == storage.ErrObjectNotExist { if err == storage.ErrObjectNotExist {
return info, tusd.ErrNotFound return info, tusd.ErrNotFound
@ -136,37 +146,69 @@ func (store GCSStore) GetInfo(id string) (tusd.FileInfo, error) {
Prefix: prefix, Prefix: prefix,
} }
names, err := store.Service.FilterObjects(filterParams) names, err := store.Service.FilterObjects(ctx, filterParams)
if err != nil { if err != nil {
return info, err return info, err
} }
var offset int64 = 0 var offset int64 = 0
var firstError error = nil
var wg sync.WaitGroup
sem := make(chan struct{}, CONCURRENT_SIZE_REQUESTS)
errChan := make(chan error)
go func() {
for err := range errChan {
if err != context.Canceled && firstError == nil {
firstError = err
cancel()
}
}
}()
for _, name := range names { for _, name := range names {
sem <- struct{}{}
wg.Add(1)
params = GCSObjectParams{ params = GCSObjectParams{
Bucket: store.Bucket, Bucket: store.Bucket,
ID: name, ID: name,
} }
size, err := store.Service.GetObjectSize(params) go func() {
defer func() {
<-sem
wg.Done()
}()
size, err := store.Service.GetObjectSize(ctx, params)
if err != nil { if err != nil {
return info, err errChan <- err
return
} }
offset += size atomic.AddInt64(&offset, size)
}()
}
wg.Wait()
close(errChan)
if firstError != nil {
return info, firstError
} }
info.Offset = offset info.Offset = offset
err = store.writeInfo(id, info) err = store.writeInfo(ctx, id, info)
if err != nil { if err != nil {
return info, err return info, err
} }
return info, nil return info, nil
} }
func (store GCSStore) writeInfo(id string, info tusd.FileInfo) error { func (store GCSStore) writeInfo(ctx context.Context, id string, info tusd.FileInfo) error {
data, err := json.Marshal(info) data, err := json.Marshal(info)
if err != nil { if err != nil {
return err return err
@ -180,7 +222,7 @@ func (store GCSStore) writeInfo(id string, info tusd.FileInfo) error {
ID: i, ID: i,
} }
_, err = store.Service.WriteObject(params, r) _, err = store.Service.WriteObject(ctx, params, r)
if err != nil { if err != nil {
return err return err
} }
@ -195,7 +237,8 @@ func (store GCSStore) FinishUpload(id string) error {
Prefix: prefix, Prefix: prefix,
} }
names, err := store.Service.FilterObjects(filterParams) ctx := context.Background()
names, err := store.Service.FilterObjects(ctx, filterParams)
if err != nil { if err != nil {
return err return err
} }
@ -206,12 +249,12 @@ func (store GCSStore) FinishUpload(id string) error {
Sources: names, Sources: names,
} }
err = store.Service.ComposeObjects(composeParams) err = store.Service.ComposeObjects(ctx, composeParams)
if err != nil { if err != nil {
return err return err
} }
err = store.Service.DeleteObjectsWithFilter(filterParams) err = store.Service.DeleteObjectsWithFilter(ctx, filterParams)
if err != nil { if err != nil {
return err return err
} }
@ -226,7 +269,7 @@ func (store GCSStore) FinishUpload(id string) error {
ID: id, ID: id,
} }
err = store.Service.SetObjectMetadata(objectParams, info.MetaData) err = store.Service.SetObjectMetadata(ctx, objectParams, info.MetaData)
if err != nil { if err != nil {
return err return err
} }
@ -240,7 +283,8 @@ func (store GCSStore) Terminate(id string) error {
Prefix: id, Prefix: id,
} }
err := store.Service.DeleteObjectsWithFilter(filterParams) ctx := context.Background()
err := store.Service.DeleteObjectsWithFilter(ctx, filterParams)
if err != nil { if err != nil {
return err return err
} }
@ -254,7 +298,8 @@ func (store GCSStore) GetReader(id string) (io.Reader, error) {
ID: id, ID: id,
} }
r, err := store.Service.ReadObject(params) ctx := context.Background()
r, err := store.Service.ReadObject(ctx, params)
if err != nil { if err != nil {
return nil, err return nil, err
} }