cli: Add flag to customize the S3 part buffer size

This commit is contained in:
Marius 2021-06-21 13:17:36 +02:00
parent 36f12b1d18
commit ccdfe8e604
3 changed files with 15 additions and 7 deletions

View File

@ -59,6 +59,7 @@ func CreateComposer() {
store := s3store.New(Flags.S3Bucket, s3.New(session.Must(session.NewSession()), s3Config)) store := s3store.New(Flags.S3Bucket, s3.New(session.Must(session.NewSession()), s3Config))
store.ObjectPrefix = Flags.S3ObjectPrefix store.ObjectPrefix = Flags.S3ObjectPrefix
store.PreferredPartSize = Flags.S3PartSize store.PreferredPartSize = Flags.S3PartSize
store.MaxBufferedParts = Flags.S3MaxBufferedParts
store.DisableContentHashes = Flags.S3DisableContentHashes store.DisableContentHashes = Flags.S3DisableContentHashes
store.SetConcurrentPartUploads(Flags.S3ConcurrentPartUploads) store.SetConcurrentPartUploads(Flags.S3ConcurrentPartUploads)
store.UseIn(Composer) store.UseIn(Composer)

View File

@ -25,6 +25,7 @@ var Flags struct {
S3ObjectPrefix string S3ObjectPrefix string
S3Endpoint string S3Endpoint string
S3PartSize int64 S3PartSize int64
S3MaxBufferedParts int64
S3DisableContentHashes bool S3DisableContentHashes bool
S3DisableSSL bool S3DisableSSL bool
S3ConcurrentPartUploads int S3ConcurrentPartUploads int
@ -71,6 +72,7 @@ func ParseFlags() {
flag.StringVar(&Flags.S3ObjectPrefix, "s3-object-prefix", "", "Prefix for S3 object names") flag.StringVar(&Flags.S3ObjectPrefix, "s3-object-prefix", "", "Prefix for S3 object names")
flag.StringVar(&Flags.S3Endpoint, "s3-endpoint", "", "Endpoint to use S3 compatible implementations like minio (requires s3-bucket to be pass)") flag.StringVar(&Flags.S3Endpoint, "s3-endpoint", "", "Endpoint to use S3 compatible implementations like minio (requires s3-bucket to be pass)")
flag.Int64Var(&Flags.S3PartSize, "s3-part-size", 50*1024*1024, "Size in bytes of the individual upload requests made to the S3 API. Defaults to 50MiB (experimental and may be removed in the future)") flag.Int64Var(&Flags.S3PartSize, "s3-part-size", 50*1024*1024, "Size in bytes of the individual upload requests made to the S3 API. Defaults to 50MiB (experimental and may be removed in the future)")
flag.Int64Var(&Flags.S3MaxBufferedParts, "s3-max-buffered-parts", 20, "Size in bytes of the individual upload requests made to the S3 API. Defaults to 50MiB (experimental and may be removed in the future)")
flag.BoolVar(&Flags.S3DisableContentHashes, "s3-disable-content-hashes", false, "Disable the calculation of MD5 and SHA256 hashes for the content that gets uploaded to S3 for minimized CPU usage (experimental and may be removed in the future)") flag.BoolVar(&Flags.S3DisableContentHashes, "s3-disable-content-hashes", false, "Disable the calculation of MD5 and SHA256 hashes for the content that gets uploaded to S3 for minimized CPU usage (experimental and may be removed in the future)")
flag.BoolVar(&Flags.S3DisableSSL, "s3-disable-ssl", false, "Disable SSL and only use HTTP for communication with S3 (experimental and may be removed in the future)") flag.BoolVar(&Flags.S3DisableSSL, "s3-disable-ssl", false, "Disable SSL and only use HTTP for communication with S3 (experimental and may be removed in the future)")
flag.IntVar(&Flags.S3ConcurrentPartUploads, "s3-concurrent-part-uploads", 10, "Number of concurrent part uploads to S3 (experimental and may be removed in the future)") flag.IntVar(&Flags.S3ConcurrentPartUploads, "s3-concurrent-part-uploads", 10, "Number of concurrent part uploads to S3 (experimental and may be removed in the future)")

View File

@ -451,7 +451,18 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re
var wg sync.WaitGroup var wg sync.WaitGroup
var uploadErr error var uploadErr error
for fileChunk := range fileChan { for {
// We acquire the semaphore before starting the goroutine to avoid
// starting many goroutines, most of which are just waiting for the lock.
// We also acquire the semaphore before reading from the channel to reduce
// the number of part files are laying around on disk without being used.
upload.store.uploadSemaphore.Acquire()
fileChunk, more := <-fileChan
if !more {
upload.store.uploadSemaphore.Release()
break
}
partfile := fileChunk.file partfile := fileChunk.file
partsize := fileChunk.size partsize := fileChunk.size
@ -465,9 +476,6 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re
upload.parts = append(upload.parts, part) upload.parts = append(upload.parts, part)
wg.Add(1) wg.Add(1)
// We acquire the semaphore before starting the goroutine to avoid
// starting many goroutines, most of which are just waiting for the lock.
upload.store.uploadSemaphore.Acquire()
go func(file *os.File, part *s3Part) { go func(file *os.File, part *s3Part) {
defer upload.store.uploadSemaphore.Release() defer upload.store.uploadSemaphore.Release()
defer wg.Done() defer wg.Done()
@ -489,9 +497,6 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re
}(partfile, part) }(partfile, part)
} else { } else {
wg.Add(1) wg.Add(1)
// We acquire the semaphore before starting the goroutine to avoid
// starting many goroutines, most of which are just waiting for the lock.
upload.store.uploadSemaphore.Acquire()
go func(file *os.File) { go func(file *os.File) {
defer upload.store.uploadSemaphore.Release() defer upload.store.uploadSemaphore.Release()
defer wg.Done() defer wg.Done()