From 27c9c4aab534a9802a6334e99c4119f876ace8ff Mon Sep 17 00:00:00 2001 From: Adam Jensen Date: Mon, 12 Nov 2018 19:14:37 -0500 Subject: [PATCH] Support prefix on S3 object names (#213) * Add ObjectPrefix field to S3Store * Integrate S3ObjectPrefix with Flags * Account for S3ObjectPrefix flag in CreateComposer * Account for ObjectPrefix in S3Store operations * Add test for configuring an S3Store with ObjectPrefix --- cmd/tusd/cli/composer.go | 1 + cmd/tusd/cli/flags.go | 2 ++ s3store/s3store.go | 37 +++++++++++++++++++++---------- s3store/s3store_test.go | 48 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 76 insertions(+), 12 deletions(-) diff --git a/cmd/tusd/cli/composer.go b/cmd/tusd/cli/composer.go index 0113275..2aad4d1 100644 --- a/cmd/tusd/cli/composer.go +++ b/cmd/tusd/cli/composer.go @@ -35,6 +35,7 @@ func CreateComposer() { // Derive credentials from default credential chain (env, shared, ec2 instance role) // as per https://github.com/aws/aws-sdk-go#configuring-credentials store := s3store.New(Flags.S3Bucket, s3.New(session.Must(session.NewSession()), s3Config)) + store.ObjectPrefix = Flags.S3ObjectPrefix store.UseIn(Composer) locker := memorylocker.New() diff --git a/cmd/tusd/cli/flags.go b/cmd/tusd/cli/flags.go index d7c6234..728a485 100644 --- a/cmd/tusd/cli/flags.go +++ b/cmd/tusd/cli/flags.go @@ -14,6 +14,7 @@ var Flags struct { Basepath string Timeout int64 S3Bucket string + S3ObjectPrefix string S3Endpoint string GCSBucket string FileHooksDir string @@ -38,6 +39,7 @@ func ParseFlags() { flag.StringVar(&Flags.Basepath, "base-path", "/files/", "Basepath of the HTTP server") flag.Int64Var(&Flags.Timeout, "timeout", 30*1000, "Read timeout for connections in milliseconds. A zero value means that reads will not timeout") flag.StringVar(&Flags.S3Bucket, "s3-bucket", "", "Use AWS S3 with this bucket as storage backend (requires the AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_REGION environment variables to be set)") + flag.StringVar(&Flags.S3ObjectPrefix, "s3-object-prefix", "", "Prefix for S3 object names") flag.StringVar(&Flags.S3Endpoint, "s3-endpoint", "", "Endpoint to use S3 compatible implementations like minio (requires s3-bucket to be pass)") flag.StringVar(&Flags.GCSBucket, "gcs-bucket", "", "Use Google Cloud Storage with this bucket as storage backend (requires the GCS_SERVICE_ACCOUNT_FILE environment variable to be set)") flag.StringVar(&Flags.FileHooksDir, "hooks-dir", "", "Directory to search for available hooks scripts") diff --git a/s3store/s3store.go b/s3store/s3store.go index 00224a6..1beab11 100644 --- a/s3store/s3store.go +++ b/s3store/s3store.go @@ -108,6 +108,10 @@ var nonASCIIRegexp = regexp.MustCompile(`([^\x00-\x7F])`) type S3Store struct { // Bucket used to store the data in, e.g. "tusdstore.example.com" Bucket string + // ObjectPrefix is prepended to the name of each S3 object that is created. + // It can be used to create a pseudo-directory structure in the bucket, + // e.g. "path/to/my/uploads". + ObjectPrefix string // Service specifies an interface used to communicate with the S3 backend. // Usually, this is an instance of github.com/aws/aws-sdk-go/service/s3.S3 // (http://docs.aws.amazon.com/sdk-for-go/api/service/s3/S3.html). @@ -197,7 +201,7 @@ func (store S3Store) NewUpload(info tusd.FileInfo) (id string, err error) { // Create the actual multipart upload res, err := store.Service.CreateMultipartUpload(&s3.CreateMultipartUploadInput{ Bucket: aws.String(store.Bucket), - Key: aws.String(uploadId), + Key: store.keyWithPrefix(uploadId), Metadata: metadata, }) if err != nil { @@ -215,7 +219,7 @@ func (store S3Store) NewUpload(info tusd.FileInfo) (id string, err error) { // Create object on S3 containing information about the file _, err = store.Service.PutObject(&s3.PutObjectInput{ Bucket: aws.String(store.Bucket), - Key: aws.String(uploadId + ".info"), + Key: store.keyWithPrefix(uploadId + ".info"), Body: bytes.NewReader(infoJson), ContentLength: aws.Int64(int64(len(infoJson))), }) @@ -286,7 +290,7 @@ func (store S3Store) WriteChunk(id string, offset int64, src io.Reader) (int64, _, err = store.Service.UploadPart(&s3.UploadPartInput{ Bucket: aws.String(store.Bucket), - Key: aws.String(uploadId), + Key: store.keyWithPrefix(uploadId), UploadId: aws.String(multipartId), PartNumber: aws.Int64(nextPartNum), Body: file, @@ -307,7 +311,7 @@ func (store S3Store) GetInfo(id string) (info tusd.FileInfo, err error) { // Get file info stored in separate object res, err := store.Service.GetObject(&s3.GetObjectInput{ Bucket: aws.String(store.Bucket), - Key: aws.String(uploadId + ".info"), + Key: store.keyWithPrefix(uploadId + ".info"), }) if err != nil { if isAwsError(err, "NoSuchKey") { @@ -353,7 +357,7 @@ func (store S3Store) GetReader(id string) (io.Reader, error) { // Attempt to get upload content res, err := store.Service.GetObject(&s3.GetObjectInput{ Bucket: aws.String(store.Bucket), - Key: aws.String(uploadId), + Key: store.keyWithPrefix(uploadId), }) if err == nil { // No error occurred, and we are able to stream the object @@ -371,7 +375,7 @@ func (store S3Store) GetReader(id string) (io.Reader, error) { // never existsted or just has not been finished yet _, err = store.Service.ListParts(&s3.ListPartsInput{ Bucket: aws.String(store.Bucket), - Key: aws.String(uploadId), + Key: store.keyWithPrefix(uploadId), UploadId: aws.String(multipartId), MaxParts: aws.Int64(0), }) @@ -400,7 +404,7 @@ func (store S3Store) Terminate(id string) error { // Abort the multipart upload _, err := store.Service.AbortMultipartUpload(&s3.AbortMultipartUploadInput{ Bucket: aws.String(store.Bucket), - Key: aws.String(uploadId), + Key: store.keyWithPrefix(uploadId), UploadId: aws.String(multipartId), }) if err != nil && !isAwsError(err, "NoSuchUpload") { @@ -417,10 +421,10 @@ func (store S3Store) Terminate(id string) error { Delete: &s3.Delete{ Objects: []*s3.ObjectIdentifier{ { - Key: aws.String(uploadId), + Key: store.keyWithPrefix(uploadId), }, { - Key: aws.String(uploadId + ".info"), + Key: store.keyWithPrefix(uploadId + ".info"), }, }, Quiet: aws.Bool(true), @@ -470,7 +474,7 @@ func (store S3Store) FinishUpload(id string) error { _, err = store.Service.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{ Bucket: aws.String(store.Bucket), - Key: aws.String(uploadId), + Key: store.keyWithPrefix(uploadId), UploadId: aws.String(multipartId), MultipartUpload: &s3.CompletedMultipartUpload{ Parts: completedParts, @@ -497,7 +501,7 @@ func (store S3Store) ConcatUploads(dest string, partialUploads []string) error { _, err := store.Service.UploadPartCopy(&s3.UploadPartCopyInput{ Bucket: aws.String(store.Bucket), - Key: aws.String(uploadId), + Key: store.keyWithPrefix(uploadId), UploadId: aws.String(multipartId), // Part numbers must be in the range of 1 to 10000, inclusive. Since // slice indexes start at 0, we add 1 to ensure that i >= 1. @@ -528,7 +532,7 @@ func (store S3Store) listAllParts(id string) (parts []*s3.Part, err error) { // Get uploaded parts listPtr, err := store.Service.ListParts(&s3.ListPartsInput{ Bucket: aws.String(store.Bucket), - Key: aws.String(uploadId), + Key: store.keyWithPrefix(uploadId), UploadId: aws.String(multipartId), PartNumberMarker: aws.Int64(partMarker), }) @@ -610,3 +614,12 @@ func (store S3Store) calcOptimalPartSize(size int64) (optimalPartSize int64, err } return optimalPartSize, nil } + +func (store S3Store) keyWithPrefix(key string) *string { + prefix := store.ObjectPrefix + if prefix != "" && !strings.HasSuffix(prefix, "/") { + prefix += "/" + } + + return aws.String(prefix + key) +} diff --git a/s3store/s3store_test.go b/s3store/s3store_test.go index ecd8f7f..77deee5 100644 --- a/s3store/s3store_test.go +++ b/s3store/s3store_test.go @@ -71,6 +71,54 @@ func TestNewUpload(t *testing.T) { assert.Equal("uploadId+multipartId", id) } +func TestNewUploadWithObjectPrefix(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + assert := assert.New(t) + + s3obj := NewMockS3API(mockCtrl) + store := New("bucket", s3obj) + store.ObjectPrefix = "my/uploaded/files" + + assert.Equal("bucket", store.Bucket) + assert.Equal(s3obj, store.Service) + + s1 := "hello" + s2 := "men?" + + gomock.InOrder( + s3obj.EXPECT().CreateMultipartUpload(&s3.CreateMultipartUploadInput{ + Bucket: aws.String("bucket"), + Key: aws.String("my/uploaded/files/uploadId"), + Metadata: map[string]*string{ + "foo": &s1, + "bar": &s2, + }, + }).Return(&s3.CreateMultipartUploadOutput{ + UploadId: aws.String("multipartId"), + }, nil), + s3obj.EXPECT().PutObject(&s3.PutObjectInput{ + Bucket: aws.String("bucket"), + Key: aws.String("my/uploaded/files/uploadId.info"), + Body: bytes.NewReader([]byte(`{"ID":"uploadId+multipartId","Size":500,"SizeIsDeferred":false,"Offset":0,"MetaData":{"bar":"menĂ¼","foo":"hello"},"IsPartial":false,"IsFinal":false,"PartialUploads":null}`)), + ContentLength: aws.Int64(int64(171)), + }), + ) + + info := tusd.FileInfo{ + ID: "uploadId", + Size: 500, + MetaData: map[string]string{ + "foo": "hello", + "bar": "menĂ¼", + }, + } + + id, err := store.NewUpload(info) + assert.Nil(err) + assert.Equal("uploadId+multipartId", id) +} + func TestNewUploadLargerMaxObjectSize(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish()