gcsstore: Add ability to set custom object prefix (#275)
Squashed commit of the following: commit e48ca3f3fe086504aa1a97d26e2f4fe263880664 Author: Marius <maerious@gmail.com> Date: Sun Jun 2 15:54:39 2019 +0200 Format Go source code commit477ef689d3
Merge:82c50f9
b89c337
Author: Ridho Azhar <azharridho42@gmail.com> Date: Mon May 27 15:56:20 2019 +0700 Merge branch 'master' into master commit82c50f9364
Author: ridhozhr <ridho@nodeflux.io> Date: Mon May 27 13:30:57 2019 +0700 add test file with prefix commitaa8a29866f
Author: ridhozhr <ridho@nodeflux.io> Date: Mon May 27 13:18:08 2019 +0700 remove object prefix gcs from parameter commite25b36c5e9
Author: ridhozhr <ridho@nodeflux.io> Date: Wed May 22 22:19:01 2019 +0700 add flags gcs object prefix validation commit53762be170
Author: ridhozhr <ridho@nodeflux.io> Date: Wed May 22 22:04:17 2019 +0700 integrate prefix with store method commitfe62533f1e
Author: ridhozhr <ridho@nodeflux.io> Date: Wed May 22 21:03:25 2019 +0700 add prefix in test file gcs store commite824008fe2
Author: ridhozhr <ridho@nodeflux.io> Date: Wed May 22 20:57:32 2019 +0700 integrate flags with composer gcs object prefix commitbb2ee4cf41
Author: ridhozhr <ridho@nodeflux.io> Date: Wed May 22 20:54:38 2019 +0700 add gcs-object-prefix flag commit600f4fc939
Author: ridhozhr <ridho@nodeflux.io> Date: Wed May 22 20:54:14 2019 +0700 add object prefix in gcs store
This commit is contained in:
parent
b89c337b1b
commit
53b86abc35
|
@ -56,6 +56,7 @@ func CreateComposer() {
|
||||||
stdout.Printf("Using 'gcs://%s' as GCS bucket for storage.\n", Flags.GCSBucket)
|
stdout.Printf("Using 'gcs://%s' as GCS bucket for storage.\n", Flags.GCSBucket)
|
||||||
|
|
||||||
store := gcsstore.New(Flags.GCSBucket, service)
|
store := gcsstore.New(Flags.GCSBucket, service)
|
||||||
|
store.ObjectPrefix = Flags.GCSObjectPrefix
|
||||||
store.UseIn(Composer)
|
store.UseIn(Composer)
|
||||||
|
|
||||||
locker := memorylocker.New()
|
locker := memorylocker.New()
|
||||||
|
|
|
@ -3,6 +3,7 @@ package cli
|
||||||
import (
|
import (
|
||||||
"flag"
|
"flag"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
var Flags struct {
|
var Flags struct {
|
||||||
|
@ -18,11 +19,11 @@ var Flags struct {
|
||||||
S3ObjectPrefix string
|
S3ObjectPrefix string
|
||||||
S3Endpoint string
|
S3Endpoint string
|
||||||
GCSBucket string
|
GCSBucket string
|
||||||
|
GCSObjectPrefix string
|
||||||
FileHooksDir string
|
FileHooksDir string
|
||||||
HttpHooksEndpoint string
|
HttpHooksEndpoint string
|
||||||
HttpHooksRetry int
|
HttpHooksRetry int
|
||||||
HttpHooksBackoff int
|
HttpHooksBackoff int
|
||||||
HooksStopUploadCode int
|
|
||||||
ShowVersion bool
|
ShowVersion bool
|
||||||
ExposeMetrics bool
|
ExposeMetrics bool
|
||||||
MetricsPath string
|
MetricsPath string
|
||||||
|
@ -45,6 +46,7 @@ func ParseFlags() {
|
||||||
flag.StringVar(&Flags.S3ObjectPrefix, "s3-object-prefix", "", "Prefix for S3 object names")
|
flag.StringVar(&Flags.S3ObjectPrefix, "s3-object-prefix", "", "Prefix for S3 object names")
|
||||||
flag.StringVar(&Flags.S3Endpoint, "s3-endpoint", "", "Endpoint to use S3 compatible implementations like minio (requires s3-bucket to be pass)")
|
flag.StringVar(&Flags.S3Endpoint, "s3-endpoint", "", "Endpoint to use S3 compatible implementations like minio (requires s3-bucket to be pass)")
|
||||||
flag.StringVar(&Flags.GCSBucket, "gcs-bucket", "", "Use Google Cloud Storage with this bucket as storage backend (requires the GCS_SERVICE_ACCOUNT_FILE environment variable to be set)")
|
flag.StringVar(&Flags.GCSBucket, "gcs-bucket", "", "Use Google Cloud Storage with this bucket as storage backend (requires the GCS_SERVICE_ACCOUNT_FILE environment variable to be set)")
|
||||||
|
flag.StringVar(&Flags.GCSObjectPrefix, "gcs-object-prefix", "", "Prefix for GCS object names (can't contain underscore character)")
|
||||||
flag.StringVar(&Flags.FileHooksDir, "hooks-dir", "", "Directory to search for available hooks scripts")
|
flag.StringVar(&Flags.FileHooksDir, "hooks-dir", "", "Directory to search for available hooks scripts")
|
||||||
flag.StringVar(&Flags.HttpHooksEndpoint, "hooks-http", "", "An HTTP endpoint to which hook events will be sent to")
|
flag.StringVar(&Flags.HttpHooksEndpoint, "hooks-http", "", "An HTTP endpoint to which hook events will be sent to")
|
||||||
flag.IntVar(&Flags.HttpHooksRetry, "hooks-http-retry", 3, "Number of times to retry on a 500 or network timeout")
|
flag.IntVar(&Flags.HttpHooksRetry, "hooks-http-retry", 3, "Number of times to retry on a 500 or network timeout")
|
||||||
|
@ -76,4 +78,9 @@ func ParseFlags() {
|
||||||
"neither flag was provided. Please consult `tusd -help` for " +
|
"neither flag was provided. Please consult `tusd -help` for " +
|
||||||
"more information on these options.")
|
"more information on these options.")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if Flags.GCSObjectPrefix != "" && strings.Contains(Flags.GCSObjectPrefix, "_") {
|
||||||
|
stderr.Fatalf("gcs-object-prefix value (%s) can't contain underscore. "+
|
||||||
|
"Please remove underscore from the value", Flags.GCSObjectPrefix)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,13 +14,14 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"golang.org/x/net/context"
|
|
||||||
"io"
|
"io"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
|
||||||
"cloud.google.com/go/storage"
|
"cloud.google.com/go/storage"
|
||||||
"github.com/tus/tusd"
|
"github.com/tus/tusd"
|
||||||
"github.com/tus/tusd/uid"
|
"github.com/tus/tusd/uid"
|
||||||
|
@ -32,6 +33,11 @@ type GCSStore struct {
|
||||||
// Specifies the GCS bucket that uploads will be stored in
|
// Specifies the GCS bucket that uploads will be stored in
|
||||||
Bucket string
|
Bucket string
|
||||||
|
|
||||||
|
// ObjectPrefix is prepended to the name of each GCS object that is created.
|
||||||
|
// It can be used to create a pseudo-directory structure in the bucket,
|
||||||
|
// e.g. "path/to/my/uploads".
|
||||||
|
ObjectPrefix string
|
||||||
|
|
||||||
// Service specifies an interface used to communicate with the Google
|
// Service specifies an interface used to communicate with the Google
|
||||||
// cloud storage backend. Implementation can be seen in gcsservice file.
|
// cloud storage backend. Implementation can be seen in gcsservice file.
|
||||||
Service GCSAPI
|
Service GCSAPI
|
||||||
|
@ -59,7 +65,7 @@ func (store GCSStore) NewUpload(info tusd.FileInfo) (id string, err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
err = store.writeInfo(ctx, info.ID, info)
|
err = store.writeInfo(ctx, store.keyWithPrefix(info.ID), info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return info.ID, err
|
return info.ID, err
|
||||||
}
|
}
|
||||||
|
@ -68,7 +74,7 @@ func (store GCSStore) NewUpload(info tusd.FileInfo) (id string, err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (store GCSStore) WriteChunk(id string, offset int64, src io.Reader) (int64, error) {
|
func (store GCSStore) WriteChunk(id string, offset int64, src io.Reader) (int64, error) {
|
||||||
prefix := fmt.Sprintf("%s_", id)
|
prefix := fmt.Sprintf("%s_", store.keyWithPrefix(id))
|
||||||
filterParams := GCSFilterParams{
|
filterParams := GCSFilterParams{
|
||||||
Bucket: store.Bucket,
|
Bucket: store.Bucket,
|
||||||
Prefix: prefix,
|
Prefix: prefix,
|
||||||
|
@ -94,7 +100,7 @@ func (store GCSStore) WriteChunk(id string, offset int64, src io.Reader) (int64,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cid := fmt.Sprintf("%s_%d", id, maxIdx+1)
|
cid := fmt.Sprintf("%s_%d", store.keyWithPrefix(id), maxIdx+1)
|
||||||
objectParams := GCSObjectParams{
|
objectParams := GCSObjectParams{
|
||||||
Bucket: store.Bucket,
|
Bucket: store.Bucket,
|
||||||
ID: cid,
|
ID: cid,
|
||||||
|
@ -112,7 +118,7 @@ const CONCURRENT_SIZE_REQUESTS = 32
|
||||||
|
|
||||||
func (store GCSStore) GetInfo(id string) (tusd.FileInfo, error) {
|
func (store GCSStore) GetInfo(id string) (tusd.FileInfo, error) {
|
||||||
info := tusd.FileInfo{}
|
info := tusd.FileInfo{}
|
||||||
i := fmt.Sprintf("%s.info", id)
|
i := fmt.Sprintf("%s.info", store.keyWithPrefix(id))
|
||||||
|
|
||||||
params := GCSObjectParams{
|
params := GCSObjectParams{
|
||||||
Bucket: store.Bucket,
|
Bucket: store.Bucket,
|
||||||
|
@ -138,7 +144,7 @@ func (store GCSStore) GetInfo(id string) (tusd.FileInfo, error) {
|
||||||
return info, err
|
return info, err
|
||||||
}
|
}
|
||||||
|
|
||||||
prefix := fmt.Sprintf("%s", id)
|
prefix := fmt.Sprintf("%s", store.keyWithPrefix(id))
|
||||||
filterParams := GCSFilterParams{
|
filterParams := GCSFilterParams{
|
||||||
Bucket: store.Bucket,
|
Bucket: store.Bucket,
|
||||||
Prefix: prefix,
|
Prefix: prefix,
|
||||||
|
@ -200,7 +206,7 @@ func (store GCSStore) GetInfo(id string) (tusd.FileInfo, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
info.Offset = offset
|
info.Offset = offset
|
||||||
err = store.writeInfo(ctx, id, info)
|
err = store.writeInfo(ctx, store.keyWithPrefix(id), info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return info, err
|
return info, err
|
||||||
}
|
}
|
||||||
|
@ -231,7 +237,7 @@ func (store GCSStore) writeInfo(ctx context.Context, id string, info tusd.FileIn
|
||||||
}
|
}
|
||||||
|
|
||||||
func (store GCSStore) FinishUpload(id string) error {
|
func (store GCSStore) FinishUpload(id string) error {
|
||||||
prefix := fmt.Sprintf("%s_", id)
|
prefix := fmt.Sprintf("%s_", store.keyWithPrefix(id))
|
||||||
filterParams := GCSFilterParams{
|
filterParams := GCSFilterParams{
|
||||||
Bucket: store.Bucket,
|
Bucket: store.Bucket,
|
||||||
Prefix: prefix,
|
Prefix: prefix,
|
||||||
|
@ -245,7 +251,7 @@ func (store GCSStore) FinishUpload(id string) error {
|
||||||
|
|
||||||
composeParams := GCSComposeParams{
|
composeParams := GCSComposeParams{
|
||||||
Bucket: store.Bucket,
|
Bucket: store.Bucket,
|
||||||
Destination: id,
|
Destination: store.keyWithPrefix(id),
|
||||||
Sources: names,
|
Sources: names,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -266,7 +272,7 @@ func (store GCSStore) FinishUpload(id string) error {
|
||||||
|
|
||||||
objectParams := GCSObjectParams{
|
objectParams := GCSObjectParams{
|
||||||
Bucket: store.Bucket,
|
Bucket: store.Bucket,
|
||||||
ID: id,
|
ID: store.keyWithPrefix(id),
|
||||||
}
|
}
|
||||||
|
|
||||||
err = store.Service.SetObjectMetadata(ctx, objectParams, info.MetaData)
|
err = store.Service.SetObjectMetadata(ctx, objectParams, info.MetaData)
|
||||||
|
@ -280,7 +286,7 @@ func (store GCSStore) FinishUpload(id string) error {
|
||||||
func (store GCSStore) Terminate(id string) error {
|
func (store GCSStore) Terminate(id string) error {
|
||||||
filterParams := GCSFilterParams{
|
filterParams := GCSFilterParams{
|
||||||
Bucket: store.Bucket,
|
Bucket: store.Bucket,
|
||||||
Prefix: id,
|
Prefix: store.keyWithPrefix(id),
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
@ -295,7 +301,7 @@ func (store GCSStore) Terminate(id string) error {
|
||||||
func (store GCSStore) GetReader(id string) (io.Reader, error) {
|
func (store GCSStore) GetReader(id string) (io.Reader, error) {
|
||||||
params := GCSObjectParams{
|
params := GCSObjectParams{
|
||||||
Bucket: store.Bucket,
|
Bucket: store.Bucket,
|
||||||
ID: id,
|
ID: store.keyWithPrefix(id),
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
@ -306,3 +312,11 @@ func (store GCSStore) GetReader(id string) (io.Reader, error) {
|
||||||
|
|
||||||
return r, nil
|
return r, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (store GCSStore) keyWithPrefix(key string) string {
|
||||||
|
prefix := store.ObjectPrefix
|
||||||
|
if prefix != "" && !strings.HasSuffix(prefix, "/") {
|
||||||
|
prefix += "/"
|
||||||
|
}
|
||||||
|
return prefix + key
|
||||||
|
}
|
||||||
|
|
|
@ -4,9 +4,10 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"golang.org/x/net/context"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
|
||||||
"cloud.google.com/go/storage"
|
"cloud.google.com/go/storage"
|
||||||
"github.com/golang/mock/gomock"
|
"github.com/golang/mock/gomock"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
@ -64,6 +65,35 @@ func TestNewUpload(t *testing.T) {
|
||||||
assert.Equal(id, mockID)
|
assert.Equal(id, mockID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestNewUploadWithPrefix(t *testing.T) {
|
||||||
|
mockCtrl := gomock.NewController(t)
|
||||||
|
defer mockCtrl.Finish()
|
||||||
|
assert := assert.New(t)
|
||||||
|
|
||||||
|
service := NewMockGCSAPI(mockCtrl)
|
||||||
|
store := gcsstore.New(mockBucket, service)
|
||||||
|
store.ObjectPrefix = "/path/to/file"
|
||||||
|
|
||||||
|
assert.Equal(store.Bucket, mockBucket)
|
||||||
|
|
||||||
|
data, err := json.Marshal(mockTusdInfo)
|
||||||
|
assert.Nil(err)
|
||||||
|
|
||||||
|
r := bytes.NewReader(data)
|
||||||
|
|
||||||
|
params := gcsstore.GCSObjectParams{
|
||||||
|
Bucket: store.Bucket,
|
||||||
|
ID: fmt.Sprintf("%s.info", "/path/to/file/"+mockID),
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
service.EXPECT().WriteObject(ctx, params, r).Return(int64(r.Len()), nil)
|
||||||
|
|
||||||
|
id, err := store.NewUpload(mockTusdInfo)
|
||||||
|
assert.Nil(err)
|
||||||
|
assert.Equal(id, mockID)
|
||||||
|
}
|
||||||
|
|
||||||
type MockGetInfoReader struct{}
|
type MockGetInfoReader struct{}
|
||||||
|
|
||||||
func (r MockGetInfoReader) Close() error {
|
func (r MockGetInfoReader) Close() error {
|
||||||
|
|
Loading…
Reference in New Issue