diff --git a/cmd/tusd/cli/composer.go b/cmd/tusd/cli/composer.go index c7e580c..79015f8 100644 --- a/cmd/tusd/cli/composer.go +++ b/cmd/tusd/cli/composer.go @@ -59,6 +59,7 @@ func CreateComposer() { store := s3store.New(Flags.S3Bucket, s3.New(session.Must(session.NewSession()), s3Config)) store.ObjectPrefix = Flags.S3ObjectPrefix store.PreferredPartSize = Flags.S3PartSize + store.MaxBufferedParts = Flags.S3MaxBufferedParts store.DisableContentHashes = Flags.S3DisableContentHashes store.SetConcurrentPartUploads(Flags.S3ConcurrentPartUploads) store.UseIn(Composer) diff --git a/cmd/tusd/cli/flags.go b/cmd/tusd/cli/flags.go index 1e12daf..4ebc66a 100644 --- a/cmd/tusd/cli/flags.go +++ b/cmd/tusd/cli/flags.go @@ -25,6 +25,7 @@ var Flags struct { S3ObjectPrefix string S3Endpoint string S3PartSize int64 + S3MaxBufferedParts int64 S3DisableContentHashes bool S3DisableSSL bool S3ConcurrentPartUploads int @@ -71,6 +72,7 @@ func ParseFlags() { flag.StringVar(&Flags.S3ObjectPrefix, "s3-object-prefix", "", "Prefix for S3 object names") flag.StringVar(&Flags.S3Endpoint, "s3-endpoint", "", "Endpoint to use S3 compatible implementations like minio (requires s3-bucket to be pass)") flag.Int64Var(&Flags.S3PartSize, "s3-part-size", 50*1024*1024, "Size in bytes of the individual upload requests made to the S3 API. Defaults to 50MiB (experimental and may be removed in the future)") + flag.Int64Var(&Flags.S3MaxBufferedParts, "s3-max-buffered-parts", 20, "Size in bytes of the individual upload requests made to the S3 API. Defaults to 50MiB (experimental and may be removed in the future)") flag.BoolVar(&Flags.S3DisableContentHashes, "s3-disable-content-hashes", false, "Disable the calculation of MD5 and SHA256 hashes for the content that gets uploaded to S3 for minimized CPU usage (experimental and may be removed in the future)") flag.BoolVar(&Flags.S3DisableSSL, "s3-disable-ssl", false, "Disable SSL and only use HTTP for communication with S3 (experimental and may be removed in the future)") flag.IntVar(&Flags.S3ConcurrentPartUploads, "s3-concurrent-part-uploads", 10, "Number of concurrent part uploads to S3 (experimental and may be removed in the future)") diff --git a/pkg/s3store/s3store.go b/pkg/s3store/s3store.go index 8323966..b36e209 100644 --- a/pkg/s3store/s3store.go +++ b/pkg/s3store/s3store.go @@ -451,7 +451,18 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re var wg sync.WaitGroup var uploadErr error - for fileChunk := range fileChan { + for { + // We acquire the semaphore before starting the goroutine to avoid + // starting many goroutines, most of which are just waiting for the lock. + // We also acquire the semaphore before reading from the channel to reduce + // the number of part files are laying around on disk without being used. + upload.store.uploadSemaphore.Acquire() + fileChunk, more := <-fileChan + if !more { + upload.store.uploadSemaphore.Release() + break + } + partfile := fileChunk.file partsize := fileChunk.size @@ -465,9 +476,6 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re upload.parts = append(upload.parts, part) wg.Add(1) - // We acquire the semaphore before starting the goroutine to avoid - // starting many goroutines, most of which are just waiting for the lock. - upload.store.uploadSemaphore.Acquire() go func(file *os.File, part *s3Part) { defer upload.store.uploadSemaphore.Release() defer wg.Done() @@ -489,9 +497,6 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re }(partfile, part) } else { wg.Add(1) - // We acquire the semaphore before starting the goroutine to avoid - // starting many goroutines, most of which are just waiting for the lock. - upload.store.uploadSemaphore.Acquire() go func(file *os.File) { defer upload.store.uploadSemaphore.Release() defer wg.Done()