From 946539c3b90f1ce5d69e2a7450bfa5defb9be949 Mon Sep 17 00:00:00 2001 From: Marius Date: Fri, 28 May 2021 13:26:13 +0200 Subject: [PATCH] cli: Add option to expose Go's pprof --- cmd/tusd/cli/flags.go | 8 +++++++ cmd/tusd/cli/metrics.go | 4 ++-- cmd/tusd/cli/pprof.go | 18 ++++++++++++++ cmd/tusd/cli/serve.go | 30 +++++++++++++++--------- pkg/s3store/s3store.go | 35 ++++++++++++++++++---------- pkg/s3store/s3store_part_producer.go | 31 ++++++++++++++++-------- 6 files changed, 91 insertions(+), 35 deletions(-) create mode 100644 cmd/tusd/cli/pprof.go diff --git a/cmd/tusd/cli/flags.go b/cmd/tusd/cli/flags.go index ee91392..1e12daf 100644 --- a/cmd/tusd/cli/flags.go +++ b/cmd/tusd/cli/flags.go @@ -45,6 +45,10 @@ var Flags struct { ShowVersion bool ExposeMetrics bool MetricsPath string + ExposePprof bool + PprofPath string + PprofBlockProfileRate int + PprofMutexProfileRate int BehindProxy bool VerboseOutput bool S3TransferAcceleration bool @@ -86,6 +90,10 @@ func ParseFlags() { flag.BoolVar(&Flags.ShowVersion, "version", false, "Print tusd version information") flag.BoolVar(&Flags.ExposeMetrics, "expose-metrics", true, "Expose metrics about tusd usage") flag.StringVar(&Flags.MetricsPath, "metrics-path", "/metrics", "Path under which the metrics endpoint will be accessible") + flag.BoolVar(&Flags.ExposePprof, "expose-pprof", false, "Expose the pprof interface over HTTP for profiling tusd") + flag.StringVar(&Flags.PprofPath, "pprof-path", "/debug/pprof/", "Path under which the pprof endpoint will be accessible") + flag.IntVar(&Flags.PprofBlockProfileRate, "pprof-block-profile-rate", 0, "Fraction of goroutine blocking events that are reported in the blocking profile") + flag.IntVar(&Flags.PprofMutexProfileRate, "pprof-mutex-profile-rate", 0, "Fraction of mutex contention events that are reported in the mutex profile") flag.BoolVar(&Flags.BehindProxy, "behind-proxy", false, "Respect X-Forwarded-* and similar headers which may be set by proxies") flag.BoolVar(&Flags.VerboseOutput, "verbose", true, "Enable verbose logging output") flag.BoolVar(&Flags.S3TransferAcceleration, "s3-transfer-acceleration", false, "Use AWS S3 transfer acceleration endpoint (requires -s3-bucket option and Transfer Acceleration property on S3 bucket to be set)") diff --git a/cmd/tusd/cli/metrics.go b/cmd/tusd/cli/metrics.go index 5ecd8e2..2052c18 100644 --- a/cmd/tusd/cli/metrics.go +++ b/cmd/tusd/cli/metrics.go @@ -31,12 +31,12 @@ var MetricsHookInvocationsTotal = prometheus.NewCounterVec( []string{"hooktype"}, ) -func SetupMetrics(handler *handler.Handler) { +func SetupMetrics(mux *http.ServeMux, handler *handler.Handler) { prometheus.MustRegister(MetricsOpenConnections) prometheus.MustRegister(MetricsHookErrorsTotal) prometheus.MustRegister(MetricsHookInvocationsTotal) prometheus.MustRegister(prometheuscollector.New(handler.Metrics)) stdout.Printf("Using %s as the metrics path.\n", Flags.MetricsPath) - http.Handle(Flags.MetricsPath, promhttp.Handler()) + mux.Handle(Flags.MetricsPath, promhttp.Handler()) } diff --git a/cmd/tusd/cli/pprof.go b/cmd/tusd/cli/pprof.go new file mode 100644 index 0000000..6c37e5c --- /dev/null +++ b/cmd/tusd/cli/pprof.go @@ -0,0 +1,18 @@ +package cli + +import ( + "net/http" + "net/http/pprof" + "runtime" +) + +func SetupPprof(mux *http.ServeMux) { + runtime.SetBlockProfileRate(Flags.PprofBlockProfileRate) + runtime.SetMutexProfileFraction(Flags.PprofMutexProfileRate) + + mux.HandleFunc(Flags.PprofPath, pprof.Index) + mux.HandleFunc(Flags.PprofPath+"cmdline", pprof.Cmdline) + mux.HandleFunc(Flags.PprofPath+"profile", pprof.Profile) + mux.HandleFunc(Flags.PprofPath+"symbol", pprof.Symbol) + mux.HandleFunc(Flags.PprofPath+"trace", pprof.Trace) +} diff --git a/cmd/tusd/cli/serve.go b/cmd/tusd/cli/serve.go index aad6ad1..cada5c4 100644 --- a/cmd/tusd/cli/serve.go +++ b/cmd/tusd/cli/serve.go @@ -58,28 +58,33 @@ func Serve() { SetupPostHooks(handler) - if Flags.ExposeMetrics { - SetupMetrics(handler) - SetupHookMetrics() - } - stdout.Printf("Supported tus extensions: %s\n", handler.SupportedExtensions()) + mux := http.NewServeMux() if basepath == "/" { // If the basepath is set to the root path, only install the tusd handler // and do not show a greeting. - http.Handle("/", http.StripPrefix("/", handler)) + mux.Handle("/", http.StripPrefix("/", handler)) } else { // If a custom basepath is defined, we show a greeting at the root path... - http.HandleFunc("/", DisplayGreeting) + mux.HandleFunc("/", DisplayGreeting) // ... and register a route with and without the trailing slash, so we can // handle uploads for /files/ and /files, for example. basepathWithoutSlash := strings.TrimSuffix(basepath, "/") basepathWithSlash := basepathWithoutSlash + "/" - http.Handle(basepathWithSlash, http.StripPrefix(basepathWithSlash, handler)) - http.Handle(basepathWithoutSlash, http.StripPrefix(basepathWithoutSlash, handler)) + mux.Handle(basepathWithSlash, http.StripPrefix(basepathWithSlash, handler)) + mux.Handle(basepathWithoutSlash, http.StripPrefix(basepathWithoutSlash, handler)) + } + + if Flags.ExposeMetrics { + SetupMetrics(mux, handler) + SetupHookMetrics() + } + + if Flags.ExposePprof { + SetupPprof(mux) } var listener net.Listener @@ -106,14 +111,17 @@ func Serve() { // If we're not using TLS just start the server and, if http.Serve() returns, just return. if protocol == "http" { - if err = http.Serve(listener, nil); err != nil { + if err = http.Serve(listener, mux); err != nil { stderr.Fatalf("Unable to serve: %s", err) } return } + // TODO: Move TLS handling into own file. // Fall-through for TLS mode. - server := &http.Server{} + server := &http.Server{ + Handler: mux, + } switch Flags.TLSMode { case TLS13: server.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS13} diff --git a/pkg/s3store/s3store.go b/pkg/s3store/s3store.go index 0d2431f..8323966 100644 --- a/pkg/s3store/s3store.go +++ b/pkg/s3store/s3store.go @@ -166,6 +166,9 @@ type S3Store struct { // requestDurationMetric holds the prometheus instance for storing the request durations. requestDurationMetric *prometheus.SummaryVec + + // diskWriteDurationMetric holds the prometheus instance for storing the time it takes to write chunks to disk. + diskWriteDurationMetric prometheus.Summary } // The labels to use for observing and storing request duration. One label per operation. @@ -208,18 +211,25 @@ func New(bucket string, service S3API) S3Store { Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, }, []string{"operation"}) + diskWriteDurationMetric := prometheus.NewSummary(prometheus.SummaryOpts{ + Name: "tusd_s3_disk_write_duration_ms", + Help: "Duration of chunk writes to disk in milliseconds", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + }) + return S3Store{ - Bucket: bucket, - Service: service, - MaxPartSize: 5 * 1024 * 1024 * 1024, - MinPartSize: 5 * 1024 * 1024, - PreferredPartSize: 50 * 1024 * 1024, - MaxMultipartParts: 10000, - MaxObjectSize: 5 * 1024 * 1024 * 1024 * 1024, - MaxBufferedParts: 20, - TemporaryDirectory: "", - uploadSemaphore: semaphore.New(10), - requestDurationMetric: requestDurationMetric, + Bucket: bucket, + Service: service, + MaxPartSize: 5 * 1024 * 1024 * 1024, + MinPartSize: 5 * 1024 * 1024, + PreferredPartSize: 50 * 1024 * 1024, + MaxMultipartParts: 10000, + MaxObjectSize: 5 * 1024 * 1024 * 1024 * 1024, + MaxBufferedParts: 20, + TemporaryDirectory: "", + uploadSemaphore: semaphore.New(10), + requestDurationMetric: requestDurationMetric, + diskWriteDurationMetric: diskWriteDurationMetric, } } @@ -239,6 +249,7 @@ func (store S3Store) UseIn(composer *handler.StoreComposer) { func (store S3Store) RegisterMetrics(registry prometheus.Registerer) { registry.MustRegister(store.requestDurationMetric) + registry.MustRegister(store.diskWriteDurationMetric) } func (store S3Store) observeRequestDuration(start time.Time, label string) { @@ -433,7 +444,7 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re numParts := len(parts) nextPartNum := int64(numParts + 1) - partProducer, fileChan := newS3PartProducer(src, store.MaxBufferedParts, store.TemporaryDirectory) + partProducer, fileChan := newS3PartProducer(src, store.MaxBufferedParts, store.TemporaryDirectory, store.diskWriteDurationMetric) defer partProducer.stop() go partProducer.produce(optimalPartSize) diff --git a/pkg/s3store/s3store_part_producer.go b/pkg/s3store/s3store_part_producer.go index 94d1a0e..0f7c9dd 100644 --- a/pkg/s3store/s3store_part_producer.go +++ b/pkg/s3store/s3store_part_producer.go @@ -4,15 +4,19 @@ import ( "io" "io/ioutil" "os" + "time" + + "github.com/prometheus/client_golang/prometheus" ) // s3PartProducer converts a stream of bytes from the reader into a stream of files on disk type s3PartProducer struct { - tmpDir string - files chan fileChunk - done chan struct{} - err error - r io.Reader + tmpDir string + files chan fileChunk + done chan struct{} + err error + r io.Reader + diskWriteDurationMetric prometheus.Summary } type fileChunk struct { @@ -20,15 +24,16 @@ type fileChunk struct { size int64 } -func newS3PartProducer(source io.Reader, backlog int64, tmpDir string) (s3PartProducer, <-chan fileChunk) { +func newS3PartProducer(source io.Reader, backlog int64, tmpDir string, diskWriteDurationMetric prometheus.Summary) (s3PartProducer, <-chan fileChunk) { fileChan := make(chan fileChunk, backlog) doneChan := make(chan struct{}) partProducer := s3PartProducer{ - tmpDir: tmpDir, - done: doneChan, - files: fileChan, - r: source, + tmpDir: tmpDir, + done: doneChan, + files: fileChan, + r: source, + diskWriteDurationMetric: diskWriteDurationMetric, } return partProducer, fileChan @@ -78,6 +83,8 @@ func (spp *s3PartProducer) nextPart(size int64) (fileChunk, bool, error) { } limitedReader := io.LimitReader(spp.r, size) + start := time.Now() + n, err := io.Copy(file, limitedReader) if err != nil { return fileChunk{}, false, err @@ -91,6 +98,10 @@ func (spp *s3PartProducer) nextPart(size int64) (fileChunk, bool, error) { return fileChunk{}, false, nil } + elapsed := time.Now().Sub(start) + ms := float64(elapsed.Nanoseconds() / int64(time.Millisecond)) + spp.diskWriteDurationMetric.Observe(ms) + // Seek to the beginning of the file file.Seek(0, 0)