Add flag to disable calculating expensive hashes for S3

This commit is contained in:
Marius 2021-01-18 13:31:07 +01:00
parent 0822c0ac43
commit 67fd74e129
3 changed files with 91 additions and 5 deletions

View File

@ -30,6 +30,12 @@ func CreateComposer() {
s3Config = s3Config.WithS3UseAccelerate(true) s3Config = s3Config.WithS3UseAccelerate(true)
} }
if Flags.S3DisableContentHashes {
// Prevent the S3 service client from automatically
// adding the Content-MD5 header to S3 Object Put and Upload API calls.
s3Config = s3Config.WithS3DisableContentMD5Validation(true)
}
if Flags.S3Endpoint == "" { if Flags.S3Endpoint == "" {
if Flags.S3TransferAcceleration { if Flags.S3TransferAcceleration {
@ -49,6 +55,7 @@ func CreateComposer() {
store := s3store.New(Flags.S3Bucket, s3.New(session.Must(session.NewSession()), s3Config)) store := s3store.New(Flags.S3Bucket, s3.New(session.Must(session.NewSession()), s3Config))
store.ObjectPrefix = Flags.S3ObjectPrefix store.ObjectPrefix = Flags.S3ObjectPrefix
store.PreferredPartSize = Flags.S3PartSize store.PreferredPartSize = Flags.S3PartSize
store.DisableContentHashes = Flags.S3DisableContentHashes
store.UseIn(Composer) store.UseIn(Composer)
locker := memorylocker.New() locker := memorylocker.New()

View File

@ -2,8 +2,13 @@ package cli
import ( import (
"flag" "flag"
"fmt"
"log"
"os"
"path/filepath" "path/filepath"
"runtime/pprof"
"strings" "strings"
"time"
"github.com/tus/tusd/cmd/tusd/cli/hooks" "github.com/tus/tusd/cmd/tusd/cli/hooks"
) )
@ -20,6 +25,7 @@ var Flags struct {
S3ObjectPrefix string S3ObjectPrefix string
S3Endpoint string S3Endpoint string
S3PartSize int64 S3PartSize int64
S3DisableContentHashes bool
GCSBucket string GCSBucket string
GCSObjectPrefix string GCSObjectPrefix string
EnabledHooksString string EnabledHooksString string
@ -43,6 +49,8 @@ var Flags struct {
TLSCertFile string TLSCertFile string
TLSKeyFile string TLSKeyFile string
TLSMode string TLSMode string
CPUProfile string
} }
func ParseFlags() { func ParseFlags() {
@ -57,6 +65,7 @@ func ParseFlags() {
flag.StringVar(&Flags.S3ObjectPrefix, "s3-object-prefix", "", "Prefix for S3 object names") flag.StringVar(&Flags.S3ObjectPrefix, "s3-object-prefix", "", "Prefix for S3 object names")
flag.StringVar(&Flags.S3Endpoint, "s3-endpoint", "", "Endpoint to use S3 compatible implementations like minio (requires s3-bucket to be pass)") flag.StringVar(&Flags.S3Endpoint, "s3-endpoint", "", "Endpoint to use S3 compatible implementations like minio (requires s3-bucket to be pass)")
flag.Int64Var(&Flags.S3PartSize, "s3-part-size", 50*1024*1024, "Size in bytes of the individual upload requests made to the S3 API. Defaults to 50MiB (experimental and may be removed in the future)") flag.Int64Var(&Flags.S3PartSize, "s3-part-size", 50*1024*1024, "Size in bytes of the individual upload requests made to the S3 API. Defaults to 50MiB (experimental and may be removed in the future)")
flag.BoolVar(&Flags.S3DisableContentHashes, "s3-disable-content-hashes", false, "Disable the calculation of MD5 and SHA256 hashes for the content that gets uploaded to S3 for minimized CPU usage (experimental and may be removed in the future)")
flag.StringVar(&Flags.GCSBucket, "gcs-bucket", "", "Use Google Cloud Storage with this bucket as storage backend (requires the GCS_SERVICE_ACCOUNT_FILE environment variable to be set)") flag.StringVar(&Flags.GCSBucket, "gcs-bucket", "", "Use Google Cloud Storage with this bucket as storage backend (requires the GCS_SERVICE_ACCOUNT_FILE environment variable to be set)")
flag.StringVar(&Flags.GCSObjectPrefix, "gcs-object-prefix", "", "Prefix for GCS object names (can't contain underscore character)") flag.StringVar(&Flags.GCSObjectPrefix, "gcs-object-prefix", "", "Prefix for GCS object names (can't contain underscore character)")
flag.StringVar(&Flags.EnabledHooksString, "hooks-enabled-events", "pre-create,post-create,post-receive,post-terminate,post-finish", "Comma separated list of enabled hook events (e.g. post-create,post-finish). Leave empty to enable default events") flag.StringVar(&Flags.EnabledHooksString, "hooks-enabled-events", "pre-create,post-create,post-receive,post-terminate,post-finish", "Comma separated list of enabled hook events (e.g. post-create,post-finish). Leave empty to enable default events")
@ -79,6 +88,8 @@ func ParseFlags() {
flag.StringVar(&Flags.TLSCertFile, "tls-certificate", "", "Path to the file containing the x509 TLS certificate to be used. The file should also contain any intermediate certificates and the CA certificate.") flag.StringVar(&Flags.TLSCertFile, "tls-certificate", "", "Path to the file containing the x509 TLS certificate to be used. The file should also contain any intermediate certificates and the CA certificate.")
flag.StringVar(&Flags.TLSKeyFile, "tls-key", "", "Path to the file containing the key for the TLS certificate.") flag.StringVar(&Flags.TLSKeyFile, "tls-key", "", "Path to the file containing the key for the TLS certificate.")
flag.StringVar(&Flags.TLSMode, "tls-mode", "tls12", "Specify which TLS mode to use; valid modes are tls13, tls12, and tls12-strong.") flag.StringVar(&Flags.TLSMode, "tls-mode", "tls12", "Specify which TLS mode to use; valid modes are tls13, tls12, and tls12-strong.")
flag.StringVar(&Flags.CPUProfile, "cpuprofile", "", "write cpu profile to file")
flag.Parse() flag.Parse()
SetEnabledHooks() SetEnabledHooks()
@ -86,6 +97,20 @@ func ParseFlags() {
if Flags.FileHooksDir != "" { if Flags.FileHooksDir != "" {
Flags.FileHooksDir, _ = filepath.Abs(Flags.FileHooksDir) Flags.FileHooksDir, _ = filepath.Abs(Flags.FileHooksDir)
} }
if Flags.CPUProfile != "" {
f, err := os.Create(Flags.CPUProfile)
if err != nil {
log.Fatal(err)
}
pprof.StartCPUProfile(f)
go func() {
<-time.After(20 * time.Second)
pprof.StopCPUProfile()
fmt.Println("Stopped CPU profile")
}()
}
} }
func SetEnabledHooks() { func SetEnabledHooks() {

View File

@ -76,10 +76,12 @@ import (
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"net/http"
"os" "os"
"regexp" "regexp"
"strings" "strings"
"sync" "sync"
"time"
"github.com/tus/tusd/internal/uid" "github.com/tus/tusd/internal/uid"
"github.com/tus/tusd/pkg/handler" "github.com/tus/tusd/pkg/handler"
@ -150,6 +152,12 @@ type S3Store struct {
// on disk during the upload. An empty string ("", the default value) will // on disk during the upload. An empty string ("", the default value) will
// cause S3Store to use the operating system's default temporary directory. // cause S3Store to use the operating system's default temporary directory.
TemporaryDirectory string TemporaryDirectory string
// DisableContentHashes instructs the S3Store to not calculate the MD5 and SHA256
// hashes when uploading data to S3. These hashes are used for file integrity checks
// and for authentication. However, these hashes also consume a significant amount of
// CPU, so it might be desirable to disable them.
// Note that this property is experimental and might be removed in the future!
DisableContentHashes bool
} }
type S3API interface { type S3API interface {
@ -165,6 +173,10 @@ type S3API interface {
UploadPartCopyWithContext(ctx context.Context, input *s3.UploadPartCopyInput, opt ...request.Option) (*s3.UploadPartCopyOutput, error) UploadPartCopyWithContext(ctx context.Context, input *s3.UploadPartCopyInput, opt ...request.Option) (*s3.UploadPartCopyOutput, error)
} }
type s3APIForPresigning interface {
UploadPartRequest(input *s3.UploadPartInput) (req *request.Request, output *s3.UploadPartOutput)
}
// New constructs a new storage using the supplied bucket and service object. // New constructs a new storage using the supplied bucket and service object.
func New(bucket string, service S3API) S3Store { func New(bucket string, service S3API) S3Store {
return S3Store{ return S3Store{
@ -433,9 +445,8 @@ func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Read
Key: store.keyWithPrefix(uploadId), Key: store.keyWithPrefix(uploadId),
UploadId: aws.String(multipartId), UploadId: aws.String(multipartId),
PartNumber: aws.Int64(nextPartNum), PartNumber: aws.Int64(nextPartNum),
Body: file,
} }
if err := upload.putPartForUpload(ctx, uploadPartInput, file); err != nil { if err := upload.putPartForUpload(ctx, uploadPartInput, file, n); err != nil {
return bytesUploaded, err return bytesUploaded, err
} }
} else { } else {
@ -461,11 +472,54 @@ func cleanUpTempFile(file *os.File) {
os.Remove(file.Name()) os.Remove(file.Name())
} }
func (upload *s3Upload) putPartForUpload(ctx context.Context, uploadPartInput *s3.UploadPartInput, file *os.File) error { func (upload *s3Upload) putPartForUpload(ctx context.Context, uploadPartInput *s3.UploadPartInput, file *os.File, size int64) error {
defer cleanUpTempFile(file) defer cleanUpTempFile(file)
_, err := upload.store.Service.UploadPartWithContext(ctx, uploadPartInput) if !upload.store.DisableContentHashes {
return err // By default, use the traditional approach to upload data
uploadPartInput.Body = file
_, err := upload.store.Service.UploadPartWithContext(ctx, uploadPartInput)
return err
} else {
// Experimental feature to prevent the AWS SDK from calculating the SHA256 hash
// for the parts we upload to S3.
// We compute the presigned URL without the body attached and then send the request
// on our own. This way, the body is not included in the SHA256 calculation.
s3api, ok := upload.store.Service.(s3APIForPresigning)
if !ok {
return fmt.Errorf("s3store: failed to cast S3 service for presigning")
}
s3Req, _ := s3api.UploadPartRequest(uploadPartInput)
url, err := s3Req.Presign(15 * time.Minute)
if err != nil {
return err
}
req, err := http.NewRequest("PUT", url, file)
if err != nil {
return err
}
// Set the Content-Length manually to prevent the usage of Transfer-Encoding: chunked,
// which is not supported by AWS S3.
req.ContentLength = size
res, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode != 200 {
buf := new(strings.Builder)
io.Copy(buf, res.Body)
return fmt.Errorf("s3store: unexpected response code %d for presigned upload: %s", res.StatusCode, buf.String())
}
return nil
}
} }
func (upload *s3Upload) GetInfo(ctx context.Context) (info handler.FileInfo, err error) { func (upload *s3Upload) GetInfo(ctx context.Context) (info handler.FileInfo, err error) {