From 67fd74e1299f24946f97794f12c8807740d2578a Mon Sep 17 00:00:00 2001 From: Marius Date: Mon, 18 Jan 2021 13:31:07 +0100 Subject: [PATCH] Add flag to disable calculating expensive hashes for S3 --- cmd/tusd/cli/composer.go | 7 +++++ cmd/tusd/cli/flags.go | 25 ++++++++++++++++ pkg/s3store/s3store.go | 64 ++++++++++++++++++++++++++++++++++++---- 3 files changed, 91 insertions(+), 5 deletions(-) diff --git a/cmd/tusd/cli/composer.go b/cmd/tusd/cli/composer.go index 71a410f..d4ecd4f 100644 --- a/cmd/tusd/cli/composer.go +++ b/cmd/tusd/cli/composer.go @@ -30,6 +30,12 @@ func CreateComposer() { s3Config = s3Config.WithS3UseAccelerate(true) } + if Flags.S3DisableContentHashes { + // Prevent the S3 service client from automatically + // adding the Content-MD5 header to S3 Object Put and Upload API calls. + s3Config = s3Config.WithS3DisableContentMD5Validation(true) + } + if Flags.S3Endpoint == "" { if Flags.S3TransferAcceleration { @@ -49,6 +55,7 @@ func CreateComposer() { store := s3store.New(Flags.S3Bucket, s3.New(session.Must(session.NewSession()), s3Config)) store.ObjectPrefix = Flags.S3ObjectPrefix store.PreferredPartSize = Flags.S3PartSize + store.DisableContentHashes = Flags.S3DisableContentHashes store.UseIn(Composer) locker := memorylocker.New() diff --git a/cmd/tusd/cli/flags.go b/cmd/tusd/cli/flags.go index 3eeaf26..a13bd5b 100644 --- a/cmd/tusd/cli/flags.go +++ b/cmd/tusd/cli/flags.go @@ -2,8 +2,13 @@ package cli import ( "flag" + "fmt" + "log" + "os" "path/filepath" + "runtime/pprof" "strings" + "time" "github.com/tus/tusd/cmd/tusd/cli/hooks" ) @@ -20,6 +25,7 @@ var Flags struct { S3ObjectPrefix string S3Endpoint string S3PartSize int64 + S3DisableContentHashes bool GCSBucket string GCSObjectPrefix string EnabledHooksString string @@ -43,6 +49,8 @@ var Flags struct { TLSCertFile string TLSKeyFile string TLSMode string + + CPUProfile string } func ParseFlags() { @@ -57,6 +65,7 @@ func ParseFlags() { flag.StringVar(&Flags.S3ObjectPrefix, "s3-object-prefix", "", "Prefix for S3 object names") flag.StringVar(&Flags.S3Endpoint, "s3-endpoint", "", "Endpoint to use S3 compatible implementations like minio (requires s3-bucket to be pass)") flag.Int64Var(&Flags.S3PartSize, "s3-part-size", 50*1024*1024, "Size in bytes of the individual upload requests made to the S3 API. Defaults to 50MiB (experimental and may be removed in the future)") + flag.BoolVar(&Flags.S3DisableContentHashes, "s3-disable-content-hashes", false, "Disable the calculation of MD5 and SHA256 hashes for the content that gets uploaded to S3 for minimized CPU usage (experimental and may be removed in the future)") flag.StringVar(&Flags.GCSBucket, "gcs-bucket", "", "Use Google Cloud Storage with this bucket as storage backend (requires the GCS_SERVICE_ACCOUNT_FILE environment variable to be set)") flag.StringVar(&Flags.GCSObjectPrefix, "gcs-object-prefix", "", "Prefix for GCS object names (can't contain underscore character)") flag.StringVar(&Flags.EnabledHooksString, "hooks-enabled-events", "pre-create,post-create,post-receive,post-terminate,post-finish", "Comma separated list of enabled hook events (e.g. post-create,post-finish). Leave empty to enable default events") @@ -79,6 +88,8 @@ func ParseFlags() { flag.StringVar(&Flags.TLSCertFile, "tls-certificate", "", "Path to the file containing the x509 TLS certificate to be used. The file should also contain any intermediate certificates and the CA certificate.") flag.StringVar(&Flags.TLSKeyFile, "tls-key", "", "Path to the file containing the key for the TLS certificate.") flag.StringVar(&Flags.TLSMode, "tls-mode", "tls12", "Specify which TLS mode to use; valid modes are tls13, tls12, and tls12-strong.") + + flag.StringVar(&Flags.CPUProfile, "cpuprofile", "", "write cpu profile to file") flag.Parse() SetEnabledHooks() @@ -86,6 +97,20 @@ func ParseFlags() { if Flags.FileHooksDir != "" { Flags.FileHooksDir, _ = filepath.Abs(Flags.FileHooksDir) } + + if Flags.CPUProfile != "" { + f, err := os.Create(Flags.CPUProfile) + if err != nil { + log.Fatal(err) + } + pprof.StartCPUProfile(f) + + go func() { + <-time.After(20 * time.Second) + pprof.StopCPUProfile() + fmt.Println("Stopped CPU profile") + }() + } } func SetEnabledHooks() { diff --git a/pkg/s3store/s3store.go b/pkg/s3store/s3store.go index e83b81f..29f5189 100644 --- a/pkg/s3store/s3store.go +++ b/pkg/s3store/s3store.go @@ -76,10 +76,12 @@ import ( "fmt" "io" "io/ioutil" + "net/http" "os" "regexp" "strings" "sync" + "time" "github.com/tus/tusd/internal/uid" "github.com/tus/tusd/pkg/handler" @@ -150,6 +152,12 @@ type S3Store struct { // on disk during the upload. An empty string ("", the default value) will // cause S3Store to use the operating system's default temporary directory. TemporaryDirectory string + // DisableContentHashes instructs the S3Store to not calculate the MD5 and SHA256 + // hashes when uploading data to S3. These hashes are used for file integrity checks + // and for authentication. However, these hashes also consume a significant amount of + // CPU, so it might be desirable to disable them. + // Note that this property is experimental and might be removed in the future! + DisableContentHashes bool } type S3API interface { @@ -165,6 +173,10 @@ type S3API interface { UploadPartCopyWithContext(ctx context.Context, input *s3.UploadPartCopyInput, opt ...request.Option) (*s3.UploadPartCopyOutput, error) } +type s3APIForPresigning interface { + UploadPartRequest(input *s3.UploadPartInput) (req *request.Request, output *s3.UploadPartOutput) +} + // New constructs a new storage using the supplied bucket and service object. func New(bucket string, service S3API) S3Store { return S3Store{ @@ -433,9 +445,8 @@ func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Read Key: store.keyWithPrefix(uploadId), UploadId: aws.String(multipartId), PartNumber: aws.Int64(nextPartNum), - Body: file, } - if err := upload.putPartForUpload(ctx, uploadPartInput, file); err != nil { + if err := upload.putPartForUpload(ctx, uploadPartInput, file, n); err != nil { return bytesUploaded, err } } else { @@ -461,11 +472,54 @@ func cleanUpTempFile(file *os.File) { os.Remove(file.Name()) } -func (upload *s3Upload) putPartForUpload(ctx context.Context, uploadPartInput *s3.UploadPartInput, file *os.File) error { +func (upload *s3Upload) putPartForUpload(ctx context.Context, uploadPartInput *s3.UploadPartInput, file *os.File, size int64) error { defer cleanUpTempFile(file) - _, err := upload.store.Service.UploadPartWithContext(ctx, uploadPartInput) - return err + if !upload.store.DisableContentHashes { + // By default, use the traditional approach to upload data + uploadPartInput.Body = file + _, err := upload.store.Service.UploadPartWithContext(ctx, uploadPartInput) + return err + } else { + // Experimental feature to prevent the AWS SDK from calculating the SHA256 hash + // for the parts we upload to S3. + // We compute the presigned URL without the body attached and then send the request + // on our own. This way, the body is not included in the SHA256 calculation. + s3api, ok := upload.store.Service.(s3APIForPresigning) + if !ok { + return fmt.Errorf("s3store: failed to cast S3 service for presigning") + } + + s3Req, _ := s3api.UploadPartRequest(uploadPartInput) + + url, err := s3Req.Presign(15 * time.Minute) + if err != nil { + return err + } + + req, err := http.NewRequest("PUT", url, file) + if err != nil { + return err + } + + // Set the Content-Length manually to prevent the usage of Transfer-Encoding: chunked, + // which is not supported by AWS S3. + req.ContentLength = size + + res, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer res.Body.Close() + + if res.StatusCode != 200 { + buf := new(strings.Builder) + io.Copy(buf, res.Body) + return fmt.Errorf("s3store: unexpected response code %d for presigned upload: %s", res.StatusCode, buf.String()) + } + + return nil + } } func (upload *s3Upload) GetInfo(ctx context.Context) (info handler.FileInfo, err error) {