cli: Add option to expose Go's pprof
This commit is contained in:
parent
f4314dd360
commit
946539c3b9
|
@ -45,6 +45,10 @@ var Flags struct {
|
||||||
ShowVersion bool
|
ShowVersion bool
|
||||||
ExposeMetrics bool
|
ExposeMetrics bool
|
||||||
MetricsPath string
|
MetricsPath string
|
||||||
|
ExposePprof bool
|
||||||
|
PprofPath string
|
||||||
|
PprofBlockProfileRate int
|
||||||
|
PprofMutexProfileRate int
|
||||||
BehindProxy bool
|
BehindProxy bool
|
||||||
VerboseOutput bool
|
VerboseOutput bool
|
||||||
S3TransferAcceleration bool
|
S3TransferAcceleration bool
|
||||||
|
@ -86,6 +90,10 @@ func ParseFlags() {
|
||||||
flag.BoolVar(&Flags.ShowVersion, "version", false, "Print tusd version information")
|
flag.BoolVar(&Flags.ShowVersion, "version", false, "Print tusd version information")
|
||||||
flag.BoolVar(&Flags.ExposeMetrics, "expose-metrics", true, "Expose metrics about tusd usage")
|
flag.BoolVar(&Flags.ExposeMetrics, "expose-metrics", true, "Expose metrics about tusd usage")
|
||||||
flag.StringVar(&Flags.MetricsPath, "metrics-path", "/metrics", "Path under which the metrics endpoint will be accessible")
|
flag.StringVar(&Flags.MetricsPath, "metrics-path", "/metrics", "Path under which the metrics endpoint will be accessible")
|
||||||
|
flag.BoolVar(&Flags.ExposePprof, "expose-pprof", false, "Expose the pprof interface over HTTP for profiling tusd")
|
||||||
|
flag.StringVar(&Flags.PprofPath, "pprof-path", "/debug/pprof/", "Path under which the pprof endpoint will be accessible")
|
||||||
|
flag.IntVar(&Flags.PprofBlockProfileRate, "pprof-block-profile-rate", 0, "Fraction of goroutine blocking events that are reported in the blocking profile")
|
||||||
|
flag.IntVar(&Flags.PprofMutexProfileRate, "pprof-mutex-profile-rate", 0, "Fraction of mutex contention events that are reported in the mutex profile")
|
||||||
flag.BoolVar(&Flags.BehindProxy, "behind-proxy", false, "Respect X-Forwarded-* and similar headers which may be set by proxies")
|
flag.BoolVar(&Flags.BehindProxy, "behind-proxy", false, "Respect X-Forwarded-* and similar headers which may be set by proxies")
|
||||||
flag.BoolVar(&Flags.VerboseOutput, "verbose", true, "Enable verbose logging output")
|
flag.BoolVar(&Flags.VerboseOutput, "verbose", true, "Enable verbose logging output")
|
||||||
flag.BoolVar(&Flags.S3TransferAcceleration, "s3-transfer-acceleration", false, "Use AWS S3 transfer acceleration endpoint (requires -s3-bucket option and Transfer Acceleration property on S3 bucket to be set)")
|
flag.BoolVar(&Flags.S3TransferAcceleration, "s3-transfer-acceleration", false, "Use AWS S3 transfer acceleration endpoint (requires -s3-bucket option and Transfer Acceleration property on S3 bucket to be set)")
|
||||||
|
|
|
@ -31,12 +31,12 @@ var MetricsHookInvocationsTotal = prometheus.NewCounterVec(
|
||||||
[]string{"hooktype"},
|
[]string{"hooktype"},
|
||||||
)
|
)
|
||||||
|
|
||||||
func SetupMetrics(handler *handler.Handler) {
|
func SetupMetrics(mux *http.ServeMux, handler *handler.Handler) {
|
||||||
prometheus.MustRegister(MetricsOpenConnections)
|
prometheus.MustRegister(MetricsOpenConnections)
|
||||||
prometheus.MustRegister(MetricsHookErrorsTotal)
|
prometheus.MustRegister(MetricsHookErrorsTotal)
|
||||||
prometheus.MustRegister(MetricsHookInvocationsTotal)
|
prometheus.MustRegister(MetricsHookInvocationsTotal)
|
||||||
prometheus.MustRegister(prometheuscollector.New(handler.Metrics))
|
prometheus.MustRegister(prometheuscollector.New(handler.Metrics))
|
||||||
|
|
||||||
stdout.Printf("Using %s as the metrics path.\n", Flags.MetricsPath)
|
stdout.Printf("Using %s as the metrics path.\n", Flags.MetricsPath)
|
||||||
http.Handle(Flags.MetricsPath, promhttp.Handler())
|
mux.Handle(Flags.MetricsPath, promhttp.Handler())
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,18 @@
|
||||||
|
package cli
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
"net/http/pprof"
|
||||||
|
"runtime"
|
||||||
|
)
|
||||||
|
|
||||||
|
func SetupPprof(mux *http.ServeMux) {
|
||||||
|
runtime.SetBlockProfileRate(Flags.PprofBlockProfileRate)
|
||||||
|
runtime.SetMutexProfileFraction(Flags.PprofMutexProfileRate)
|
||||||
|
|
||||||
|
mux.HandleFunc(Flags.PprofPath, pprof.Index)
|
||||||
|
mux.HandleFunc(Flags.PprofPath+"cmdline", pprof.Cmdline)
|
||||||
|
mux.HandleFunc(Flags.PprofPath+"profile", pprof.Profile)
|
||||||
|
mux.HandleFunc(Flags.PprofPath+"symbol", pprof.Symbol)
|
||||||
|
mux.HandleFunc(Flags.PprofPath+"trace", pprof.Trace)
|
||||||
|
}
|
|
@ -58,28 +58,33 @@ func Serve() {
|
||||||
|
|
||||||
SetupPostHooks(handler)
|
SetupPostHooks(handler)
|
||||||
|
|
||||||
if Flags.ExposeMetrics {
|
|
||||||
SetupMetrics(handler)
|
|
||||||
SetupHookMetrics()
|
|
||||||
}
|
|
||||||
|
|
||||||
stdout.Printf("Supported tus extensions: %s\n", handler.SupportedExtensions())
|
stdout.Printf("Supported tus extensions: %s\n", handler.SupportedExtensions())
|
||||||
|
|
||||||
|
mux := http.NewServeMux()
|
||||||
if basepath == "/" {
|
if basepath == "/" {
|
||||||
// If the basepath is set to the root path, only install the tusd handler
|
// If the basepath is set to the root path, only install the tusd handler
|
||||||
// and do not show a greeting.
|
// and do not show a greeting.
|
||||||
http.Handle("/", http.StripPrefix("/", handler))
|
mux.Handle("/", http.StripPrefix("/", handler))
|
||||||
} else {
|
} else {
|
||||||
// If a custom basepath is defined, we show a greeting at the root path...
|
// If a custom basepath is defined, we show a greeting at the root path...
|
||||||
http.HandleFunc("/", DisplayGreeting)
|
mux.HandleFunc("/", DisplayGreeting)
|
||||||
|
|
||||||
// ... and register a route with and without the trailing slash, so we can
|
// ... and register a route with and without the trailing slash, so we can
|
||||||
// handle uploads for /files/ and /files, for example.
|
// handle uploads for /files/ and /files, for example.
|
||||||
basepathWithoutSlash := strings.TrimSuffix(basepath, "/")
|
basepathWithoutSlash := strings.TrimSuffix(basepath, "/")
|
||||||
basepathWithSlash := basepathWithoutSlash + "/"
|
basepathWithSlash := basepathWithoutSlash + "/"
|
||||||
|
|
||||||
http.Handle(basepathWithSlash, http.StripPrefix(basepathWithSlash, handler))
|
mux.Handle(basepathWithSlash, http.StripPrefix(basepathWithSlash, handler))
|
||||||
http.Handle(basepathWithoutSlash, http.StripPrefix(basepathWithoutSlash, handler))
|
mux.Handle(basepathWithoutSlash, http.StripPrefix(basepathWithoutSlash, handler))
|
||||||
|
}
|
||||||
|
|
||||||
|
if Flags.ExposeMetrics {
|
||||||
|
SetupMetrics(mux, handler)
|
||||||
|
SetupHookMetrics()
|
||||||
|
}
|
||||||
|
|
||||||
|
if Flags.ExposePprof {
|
||||||
|
SetupPprof(mux)
|
||||||
}
|
}
|
||||||
|
|
||||||
var listener net.Listener
|
var listener net.Listener
|
||||||
|
@ -106,14 +111,17 @@ func Serve() {
|
||||||
|
|
||||||
// If we're not using TLS just start the server and, if http.Serve() returns, just return.
|
// If we're not using TLS just start the server and, if http.Serve() returns, just return.
|
||||||
if protocol == "http" {
|
if protocol == "http" {
|
||||||
if err = http.Serve(listener, nil); err != nil {
|
if err = http.Serve(listener, mux); err != nil {
|
||||||
stderr.Fatalf("Unable to serve: %s", err)
|
stderr.Fatalf("Unable to serve: %s", err)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: Move TLS handling into own file.
|
||||||
// Fall-through for TLS mode.
|
// Fall-through for TLS mode.
|
||||||
server := &http.Server{}
|
server := &http.Server{
|
||||||
|
Handler: mux,
|
||||||
|
}
|
||||||
switch Flags.TLSMode {
|
switch Flags.TLSMode {
|
||||||
case TLS13:
|
case TLS13:
|
||||||
server.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS13}
|
server.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS13}
|
||||||
|
|
|
@ -166,6 +166,9 @@ type S3Store struct {
|
||||||
|
|
||||||
// requestDurationMetric holds the prometheus instance for storing the request durations.
|
// requestDurationMetric holds the prometheus instance for storing the request durations.
|
||||||
requestDurationMetric *prometheus.SummaryVec
|
requestDurationMetric *prometheus.SummaryVec
|
||||||
|
|
||||||
|
// diskWriteDurationMetric holds the prometheus instance for storing the time it takes to write chunks to disk.
|
||||||
|
diskWriteDurationMetric prometheus.Summary
|
||||||
}
|
}
|
||||||
|
|
||||||
// The labels to use for observing and storing request duration. One label per operation.
|
// The labels to use for observing and storing request duration. One label per operation.
|
||||||
|
@ -208,6 +211,12 @@ func New(bucket string, service S3API) S3Store {
|
||||||
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
|
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
|
||||||
}, []string{"operation"})
|
}, []string{"operation"})
|
||||||
|
|
||||||
|
diskWriteDurationMetric := prometheus.NewSummary(prometheus.SummaryOpts{
|
||||||
|
Name: "tusd_s3_disk_write_duration_ms",
|
||||||
|
Help: "Duration of chunk writes to disk in milliseconds",
|
||||||
|
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
|
||||||
|
})
|
||||||
|
|
||||||
return S3Store{
|
return S3Store{
|
||||||
Bucket: bucket,
|
Bucket: bucket,
|
||||||
Service: service,
|
Service: service,
|
||||||
|
@ -220,6 +229,7 @@ func New(bucket string, service S3API) S3Store {
|
||||||
TemporaryDirectory: "",
|
TemporaryDirectory: "",
|
||||||
uploadSemaphore: semaphore.New(10),
|
uploadSemaphore: semaphore.New(10),
|
||||||
requestDurationMetric: requestDurationMetric,
|
requestDurationMetric: requestDurationMetric,
|
||||||
|
diskWriteDurationMetric: diskWriteDurationMetric,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -239,6 +249,7 @@ func (store S3Store) UseIn(composer *handler.StoreComposer) {
|
||||||
|
|
||||||
func (store S3Store) RegisterMetrics(registry prometheus.Registerer) {
|
func (store S3Store) RegisterMetrics(registry prometheus.Registerer) {
|
||||||
registry.MustRegister(store.requestDurationMetric)
|
registry.MustRegister(store.requestDurationMetric)
|
||||||
|
registry.MustRegister(store.diskWriteDurationMetric)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (store S3Store) observeRequestDuration(start time.Time, label string) {
|
func (store S3Store) observeRequestDuration(start time.Time, label string) {
|
||||||
|
@ -433,7 +444,7 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re
|
||||||
numParts := len(parts)
|
numParts := len(parts)
|
||||||
nextPartNum := int64(numParts + 1)
|
nextPartNum := int64(numParts + 1)
|
||||||
|
|
||||||
partProducer, fileChan := newS3PartProducer(src, store.MaxBufferedParts, store.TemporaryDirectory)
|
partProducer, fileChan := newS3PartProducer(src, store.MaxBufferedParts, store.TemporaryDirectory, store.diskWriteDurationMetric)
|
||||||
defer partProducer.stop()
|
defer partProducer.stop()
|
||||||
go partProducer.produce(optimalPartSize)
|
go partProducer.produce(optimalPartSize)
|
||||||
|
|
||||||
|
|
|
@ -4,6 +4,9 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
)
|
)
|
||||||
|
|
||||||
// s3PartProducer converts a stream of bytes from the reader into a stream of files on disk
|
// s3PartProducer converts a stream of bytes from the reader into a stream of files on disk
|
||||||
|
@ -13,6 +16,7 @@ type s3PartProducer struct {
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
err error
|
err error
|
||||||
r io.Reader
|
r io.Reader
|
||||||
|
diskWriteDurationMetric prometheus.Summary
|
||||||
}
|
}
|
||||||
|
|
||||||
type fileChunk struct {
|
type fileChunk struct {
|
||||||
|
@ -20,7 +24,7 @@ type fileChunk struct {
|
||||||
size int64
|
size int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func newS3PartProducer(source io.Reader, backlog int64, tmpDir string) (s3PartProducer, <-chan fileChunk) {
|
func newS3PartProducer(source io.Reader, backlog int64, tmpDir string, diskWriteDurationMetric prometheus.Summary) (s3PartProducer, <-chan fileChunk) {
|
||||||
fileChan := make(chan fileChunk, backlog)
|
fileChan := make(chan fileChunk, backlog)
|
||||||
doneChan := make(chan struct{})
|
doneChan := make(chan struct{})
|
||||||
|
|
||||||
|
@ -29,6 +33,7 @@ func newS3PartProducer(source io.Reader, backlog int64, tmpDir string) (s3PartPr
|
||||||
done: doneChan,
|
done: doneChan,
|
||||||
files: fileChan,
|
files: fileChan,
|
||||||
r: source,
|
r: source,
|
||||||
|
diskWriteDurationMetric: diskWriteDurationMetric,
|
||||||
}
|
}
|
||||||
|
|
||||||
return partProducer, fileChan
|
return partProducer, fileChan
|
||||||
|
@ -78,6 +83,8 @@ func (spp *s3PartProducer) nextPart(size int64) (fileChunk, bool, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
limitedReader := io.LimitReader(spp.r, size)
|
limitedReader := io.LimitReader(spp.r, size)
|
||||||
|
start := time.Now()
|
||||||
|
|
||||||
n, err := io.Copy(file, limitedReader)
|
n, err := io.Copy(file, limitedReader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fileChunk{}, false, err
|
return fileChunk{}, false, err
|
||||||
|
@ -91,6 +98,10 @@ func (spp *s3PartProducer) nextPart(size int64) (fileChunk, bool, error) {
|
||||||
return fileChunk{}, false, nil
|
return fileChunk{}, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
elapsed := time.Now().Sub(start)
|
||||||
|
ms := float64(elapsed.Nanoseconds() / int64(time.Millisecond))
|
||||||
|
spp.diskWriteDurationMetric.Observe(ms)
|
||||||
|
|
||||||
// Seek to the beginning of the file
|
// Seek to the beginning of the file
|
||||||
file.Seek(0, 0)
|
file.Seek(0, 0)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue