diff --git a/cmd/tusd/cli/composer.go b/cmd/tusd/cli/composer.go index 99eb962..f8c64ce 100644 --- a/cmd/tusd/cli/composer.go +++ b/cmd/tusd/cli/composer.go @@ -56,6 +56,7 @@ func CreateComposer() { stdout.Printf("Using 'gcs://%s' as GCS bucket for storage.\n", Flags.GCSBucket) store := gcsstore.New(Flags.GCSBucket, service) + store.ObjectPrefix = Flags.GCSObjectPrefix store.UseIn(Composer) locker := memorylocker.New() diff --git a/cmd/tusd/cli/flags.go b/cmd/tusd/cli/flags.go index 5bb82de..a26b325 100644 --- a/cmd/tusd/cli/flags.go +++ b/cmd/tusd/cli/flags.go @@ -3,30 +3,31 @@ package cli import ( "flag" "path/filepath" + "strings" ) var Flags struct { - HttpHost string - HttpPort string - HttpSock string - MaxSize int64 - UploadDir string - StoreSize int64 - Basepath string - Timeout int64 - S3Bucket string - S3ObjectPrefix string - S3Endpoint string - GCSBucket string - FileHooksDir string - HttpHooksEndpoint string - HttpHooksRetry int - HttpHooksBackoff int - HooksStopUploadCode int - ShowVersion bool - ExposeMetrics bool - MetricsPath string - BehindProxy bool + HttpHost string + HttpPort string + HttpSock string + MaxSize int64 + UploadDir string + StoreSize int64 + Basepath string + Timeout int64 + S3Bucket string + S3ObjectPrefix string + S3Endpoint string + GCSBucket string + GCSObjectPrefix string + FileHooksDir string + HttpHooksEndpoint string + HttpHooksRetry int + HttpHooksBackoff int + ShowVersion bool + ExposeMetrics bool + MetricsPath string + BehindProxy bool FileHooksInstalled bool HttpHooksInstalled bool @@ -45,6 +46,7 @@ func ParseFlags() { flag.StringVar(&Flags.S3ObjectPrefix, "s3-object-prefix", "", "Prefix for S3 object names") flag.StringVar(&Flags.S3Endpoint, "s3-endpoint", "", "Endpoint to use S3 compatible implementations like minio (requires s3-bucket to be pass)") flag.StringVar(&Flags.GCSBucket, "gcs-bucket", "", "Use Google Cloud Storage with this bucket as storage backend (requires the GCS_SERVICE_ACCOUNT_FILE environment variable to be set)") + flag.StringVar(&Flags.GCSObjectPrefix, "gcs-object-prefix", "", "Prefix for GCS object names (can't contain underscore character)") flag.StringVar(&Flags.FileHooksDir, "hooks-dir", "", "Directory to search for available hooks scripts") flag.StringVar(&Flags.HttpHooksEndpoint, "hooks-http", "", "An HTTP endpoint to which hook events will be sent to") flag.IntVar(&Flags.HttpHooksRetry, "hooks-http-retry", 3, "Number of times to retry on a 500 or network timeout") @@ -76,4 +78,9 @@ func ParseFlags() { "neither flag was provided. Please consult `tusd -help` for " + "more information on these options.") } + + if Flags.GCSObjectPrefix != "" && strings.Contains(Flags.GCSObjectPrefix, "_") { + stderr.Fatalf("gcs-object-prefix value (%s) can't contain underscore. "+ + "Please remove underscore from the value", Flags.GCSObjectPrefix) + } } diff --git a/gcsstore/gcsstore.go b/gcsstore/gcsstore.go index 589ef19..cf2579e 100644 --- a/gcsstore/gcsstore.go +++ b/gcsstore/gcsstore.go @@ -14,13 +14,14 @@ import ( "bytes" "encoding/json" "fmt" - "golang.org/x/net/context" "io" "strconv" "strings" "sync" "sync/atomic" + "golang.org/x/net/context" + "cloud.google.com/go/storage" "github.com/tus/tusd" "github.com/tus/tusd/uid" @@ -32,6 +33,11 @@ type GCSStore struct { // Specifies the GCS bucket that uploads will be stored in Bucket string + // ObjectPrefix is prepended to the name of each GCS object that is created. + // It can be used to create a pseudo-directory structure in the bucket, + // e.g. "path/to/my/uploads". + ObjectPrefix string + // Service specifies an interface used to communicate with the Google // cloud storage backend. Implementation can be seen in gcsservice file. Service GCSAPI @@ -59,7 +65,7 @@ func (store GCSStore) NewUpload(info tusd.FileInfo) (id string, err error) { } ctx := context.Background() - err = store.writeInfo(ctx, info.ID, info) + err = store.writeInfo(ctx, store.keyWithPrefix(info.ID), info) if err != nil { return info.ID, err } @@ -68,7 +74,7 @@ func (store GCSStore) NewUpload(info tusd.FileInfo) (id string, err error) { } func (store GCSStore) WriteChunk(id string, offset int64, src io.Reader) (int64, error) { - prefix := fmt.Sprintf("%s_", id) + prefix := fmt.Sprintf("%s_", store.keyWithPrefix(id)) filterParams := GCSFilterParams{ Bucket: store.Bucket, Prefix: prefix, @@ -94,7 +100,7 @@ func (store GCSStore) WriteChunk(id string, offset int64, src io.Reader) (int64, } } - cid := fmt.Sprintf("%s_%d", id, maxIdx+1) + cid := fmt.Sprintf("%s_%d", store.keyWithPrefix(id), maxIdx+1) objectParams := GCSObjectParams{ Bucket: store.Bucket, ID: cid, @@ -112,7 +118,7 @@ const CONCURRENT_SIZE_REQUESTS = 32 func (store GCSStore) GetInfo(id string) (tusd.FileInfo, error) { info := tusd.FileInfo{} - i := fmt.Sprintf("%s.info", id) + i := fmt.Sprintf("%s.info", store.keyWithPrefix(id)) params := GCSObjectParams{ Bucket: store.Bucket, @@ -138,7 +144,7 @@ func (store GCSStore) GetInfo(id string) (tusd.FileInfo, error) { return info, err } - prefix := fmt.Sprintf("%s", id) + prefix := fmt.Sprintf("%s", store.keyWithPrefix(id)) filterParams := GCSFilterParams{ Bucket: store.Bucket, Prefix: prefix, @@ -200,7 +206,7 @@ func (store GCSStore) GetInfo(id string) (tusd.FileInfo, error) { } info.Offset = offset - err = store.writeInfo(ctx, id, info) + err = store.writeInfo(ctx, store.keyWithPrefix(id), info) if err != nil { return info, err } @@ -231,7 +237,7 @@ func (store GCSStore) writeInfo(ctx context.Context, id string, info tusd.FileIn } func (store GCSStore) FinishUpload(id string) error { - prefix := fmt.Sprintf("%s_", id) + prefix := fmt.Sprintf("%s_", store.keyWithPrefix(id)) filterParams := GCSFilterParams{ Bucket: store.Bucket, Prefix: prefix, @@ -245,7 +251,7 @@ func (store GCSStore) FinishUpload(id string) error { composeParams := GCSComposeParams{ Bucket: store.Bucket, - Destination: id, + Destination: store.keyWithPrefix(id), Sources: names, } @@ -266,7 +272,7 @@ func (store GCSStore) FinishUpload(id string) error { objectParams := GCSObjectParams{ Bucket: store.Bucket, - ID: id, + ID: store.keyWithPrefix(id), } err = store.Service.SetObjectMetadata(ctx, objectParams, info.MetaData) @@ -280,7 +286,7 @@ func (store GCSStore) FinishUpload(id string) error { func (store GCSStore) Terminate(id string) error { filterParams := GCSFilterParams{ Bucket: store.Bucket, - Prefix: id, + Prefix: store.keyWithPrefix(id), } ctx := context.Background() @@ -295,7 +301,7 @@ func (store GCSStore) Terminate(id string) error { func (store GCSStore) GetReader(id string) (io.Reader, error) { params := GCSObjectParams{ Bucket: store.Bucket, - ID: id, + ID: store.keyWithPrefix(id), } ctx := context.Background() @@ -306,3 +312,11 @@ func (store GCSStore) GetReader(id string) (io.Reader, error) { return r, nil } + +func (store GCSStore) keyWithPrefix(key string) string { + prefix := store.ObjectPrefix + if prefix != "" && !strings.HasSuffix(prefix, "/") { + prefix += "/" + } + return prefix + key +} diff --git a/gcsstore/gcsstore_test.go b/gcsstore/gcsstore_test.go index fbefcc2..7ab0101 100644 --- a/gcsstore/gcsstore_test.go +++ b/gcsstore/gcsstore_test.go @@ -4,9 +4,10 @@ import ( "bytes" "encoding/json" "fmt" - "golang.org/x/net/context" "testing" + "golang.org/x/net/context" + "cloud.google.com/go/storage" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" @@ -64,6 +65,35 @@ func TestNewUpload(t *testing.T) { assert.Equal(id, mockID) } +func TestNewUploadWithPrefix(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + assert := assert.New(t) + + service := NewMockGCSAPI(mockCtrl) + store := gcsstore.New(mockBucket, service) + store.ObjectPrefix = "/path/to/file" + + assert.Equal(store.Bucket, mockBucket) + + data, err := json.Marshal(mockTusdInfo) + assert.Nil(err) + + r := bytes.NewReader(data) + + params := gcsstore.GCSObjectParams{ + Bucket: store.Bucket, + ID: fmt.Sprintf("%s.info", "/path/to/file/"+mockID), + } + + ctx := context.Background() + service.EXPECT().WriteObject(ctx, params, r).Return(int64(r.Len()), nil) + + id, err := store.NewUpload(mockTusdInfo) + assert.Nil(err) + assert.Equal(id, mockID) +} + type MockGetInfoReader struct{} func (r MockGetInfoReader) Close() error {