From c80c8b1962e98b56b35ff0d1fc401b5b8c9e0f7e Mon Sep 17 00:00:00 2001 From: Marius Date: Tue, 24 May 2016 17:04:28 +0200 Subject: [PATCH] Allow exposing metrics for Prometheus and Co. --- cmd/tusd/main.go | 25 +++++ metrics.go | 87 +++++++++++++++++ prometheuscollector/prometheuscollector.go | 108 +++++++++++++++++++++ unrouted_handler.go | 19 +++- 4 files changed, 238 insertions(+), 1 deletion(-) create mode 100644 metrics.go create mode 100644 prometheuscollector/prometheuscollector.go diff --git a/cmd/tusd/main.go b/cmd/tusd/main.go index 604cc61..0e98708 100644 --- a/cmd/tusd/main.go +++ b/cmd/tusd/main.go @@ -18,12 +18,15 @@ import ( "github.com/tus/tusd/filestore" "github.com/tus/tusd/limitedstore" "github.com/tus/tusd/memorylocker" + "github.com/tus/tusd/prometheuscollector" "github.com/tus/tusd/s3store" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" + + "github.com/prometheus/client_golang/prometheus" ) var VersionName = "n/a" @@ -40,6 +43,7 @@ var timeout int64 var s3Bucket string var hooksDir string var version bool +var exposeMetrics bool var stdout = log.New(os.Stdout, "[tusd] ", 0) var stderr = log.New(os.Stderr, "[tusd] ", 0) @@ -48,6 +52,11 @@ var hookInstalled bool var greeting string +var openConnections = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "connections_open_total", + Help: "Current number of open connections.", +}) + func init() { flag.StringVar(&httpHost, "host", "0.0.0.0", "Host to bind HTTP server to") flag.StringVar(&httpPort, "port", "1080", "Port to bind HTTP server to") @@ -59,6 +68,7 @@ func init() { flag.StringVar(&s3Bucket, "s3-bucket", "", "Use AWS S3 with this bucket as storage backend (requires the AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_REGION environment variables to be set)") flag.StringVar(&hooksDir, "hooks-dir", "", "") flag.BoolVar(&version, "version", false, "Print tusd version information") + flag.BoolVar(&exposeMetrics, "expose-metrics", true, "Expose metrics about tusd usage") flag.Parse() @@ -160,6 +170,13 @@ func main() { } }() + if exposeMetrics { + prometheus.MustRegister(openConnections) + prometheus.MustRegister(prometheuscollector.New(handler.Metrics)) + + http.Handle("/metrics", prometheus.Handler()) + } + // Do not display the greeting if the tusd handler will be mounted at the root // path. Else this would cause a "multiple registrations for /" panic. if basepath != "/" { @@ -238,6 +255,9 @@ func (l *Listener) Accept() (net.Conn, error) { if err != nil { return nil, err } + + go openConnections.Inc() + tc := &Conn{ Conn: c, ReadTimeout: l.ReadTimeout, @@ -284,6 +304,11 @@ func (c *Conn) Write(b []byte) (int, error) { return c.Conn.Write(b) } +func (c *Conn) Close() error { + go openConnections.Dec() + return c.Conn.Close() +} + func NewListener(addr string, readTimeout, writeTimeout time.Duration) (net.Listener, error) { l, err := net.Listen("tcp", addr) if err != nil { diff --git a/metrics.go b/metrics.go new file mode 100644 index 0000000..6384001 --- /dev/null +++ b/metrics.go @@ -0,0 +1,87 @@ +package tusd + +import ( + "sync/atomic" +) + +// Metrics provides numbers about the usage of the tusd handler. Since these may +// be accessed from multiple goroutines, it is necessary to read and modify them +// atomically using the functions exposed in the sync/atomic package, such as +// atomic.LoadUint64. In addition the maps must not be modified to prevent data +// races. +type Metrics struct { + // RequestTotal counts the number of incoming requests per method + RequestsTotal map[string]*uint64 + // ErrorsTotal counts the number of returned errors by their message + ErrorsTotal map[string]*uint64 + BytesReceived *uint64 + UploadsFinished *uint64 + UploadsCreated *uint64 + UploadsTerminated *uint64 +} + +// incRequestsTotal increases the counter for this request method atomically by +// one. The method must be one of GET, HEAD, POST, PATCH, DELETE. +func (m Metrics) incRequestsTotal(method string) { + atomic.AddUint64(m.RequestsTotal[method], 1) +} + +// incErrorsTotal increases the counter for this error atomically by one. +func (m Metrics) incErrorsTotal(err error) { + msg := err.Error() + if _, ok := ErrStatusCodes[err]; !ok { + msg = "system error" + } + + atomic.AddUint64(m.ErrorsTotal[msg], 1) +} + +// incBytesReceived increases the number of received bytes atomically be the +// specified number. +func (m Metrics) incBytesReceived(delta uint64) { + atomic.AddUint64(m.BytesReceived, delta) +} + +// incUploadsFinished increases the counter for finished uploads atomically by one. +func (m Metrics) incUploadsFinished() { + atomic.AddUint64(m.UploadsFinished, 1) +} + +// incUploadsCreated increases the counter for completed uploads atomically by one. +func (m Metrics) incUploadsCreated() { + atomic.AddUint64(m.UploadsCreated, 1) +} + +// incUploadsTerminated increases the counter for completed uploads atomically by one. +func (m Metrics) incUploadsTerminated() { + atomic.AddUint64(m.UploadsTerminated, 1) +} + +func newMetrics() Metrics { + return Metrics{ + RequestsTotal: map[string]*uint64{ + "GET": new(uint64), + "HEAD": new(uint64), + "POST": new(uint64), + "PATCH": new(uint64), + "DELETE": new(uint64), + }, + ErrorsTotal: newErrorsTotalMap(), + BytesReceived: new(uint64), + UploadsFinished: new(uint64), + UploadsCreated: new(uint64), + UploadsTerminated: new(uint64), + } +} + +func newErrorsTotalMap() map[string]*uint64 { + m := make(map[string]*uint64, len(ErrStatusCodes)+1) + + for err, _ := range ErrStatusCodes { + m[err.Error()] = new(uint64) + } + + m["system error"] = new(uint64) + + return m +} diff --git a/prometheuscollector/prometheuscollector.go b/prometheuscollector/prometheuscollector.go new file mode 100644 index 0000000..25c8943 --- /dev/null +++ b/prometheuscollector/prometheuscollector.go @@ -0,0 +1,108 @@ +// package prometheuscollector allows to expose metrics for Prometheus. +// +// Using the provided collector, you can easily expose metrics for tusd in the +// Prometheus exposition format (https://prometheus.io/docs/instrumenting/exposition_formats/): +// +// handler, err := tusd.NewHandler(…) +// collector := prometheuscollector.New(handler.Metrics) +// prometheus.MustRegister(collector) +package prometheuscollector + +import ( + "sync/atomic" + + "github.com/tus/tusd" + + "github.com/prometheus/client_golang/prometheus" +) + +var ( + requestsTotalDesc = prometheus.NewDesc( + "tusd_requests_total", + "Total number of requests served by tusd per method.", + []string{"method"}, nil) + errorsTotalDesc = prometheus.NewDesc( + "tusd_errors_total", + "Total number of erorrs per cause.", + []string{"cause"}, nil) + bytesReceivedDesc = prometheus.NewDesc( + "tusd_bytes_received", + "Number of bytes received for uploads.", + nil, nil) + uploadsCreatedDesc = prometheus.NewDesc( + "tusd_uploads_created", + "Number of created uploads.", + nil, nil) + uploadsFinishedDesc = prometheus.NewDesc( + "tusd_uploads_finished", + "Number of finished uploads.", + nil, nil) + uploadsTerminatedDesc = prometheus.NewDesc( + "tusd_uploads_terminated", + "Number of terminted uploads.", + nil, nil) +) + +type Collector struct { + metrics tusd.Metrics +} + +// New creates a new collector which read froms the provided Metrics struct. +func New(metrics tusd.Metrics) Collector { + return Collector{ + metrics: metrics, + } +} + +func (_ Collector) Describe(descs chan<- *prometheus.Desc) { + descs <- requestsTotalDesc + descs <- errorsTotalDesc + descs <- bytesReceivedDesc + descs <- uploadsCreatedDesc + descs <- uploadsFinishedDesc + descs <- uploadsTerminatedDesc +} + +func (c Collector) Collect(metrics chan<- prometheus.Metric) { + for method, valuePtr := range c.metrics.RequestsTotal { + metrics <- prometheus.MustNewConstMetric( + requestsTotalDesc, + prometheus.GaugeValue, + float64(atomic.LoadUint64(valuePtr)), + method, + ) + } + + for error, valuePtr := range c.metrics.ErrorsTotal { + metrics <- prometheus.MustNewConstMetric( + errorsTotalDesc, + prometheus.GaugeValue, + float64(atomic.LoadUint64(valuePtr)), + error, + ) + } + + metrics <- prometheus.MustNewConstMetric( + bytesReceivedDesc, + prometheus.CounterValue, + float64(atomic.LoadUint64(c.metrics.BytesReceived)), + ) + + metrics <- prometheus.MustNewConstMetric( + uploadsFinishedDesc, + prometheus.CounterValue, + float64(atomic.LoadUint64(c.metrics.UploadsFinished)), + ) + + metrics <- prometheus.MustNewConstMetric( + uploadsCreatedDesc, + prometheus.CounterValue, + float64(atomic.LoadUint64(c.metrics.UploadsCreated)), + ) + + metrics <- prometheus.MustNewConstMetric( + uploadsTerminatedDesc, + prometheus.CounterValue, + float64(atomic.LoadUint64(c.metrics.UploadsTerminated)), + ) +} diff --git a/unrouted_handler.go b/unrouted_handler.go index 2484fdf..4610760 100644 --- a/unrouted_handler.go +++ b/unrouted_handler.go @@ -75,6 +75,8 @@ type UnroutedHandler struct { // happen if the NotifyTerminatedUploads field is set to true in the Config // structure. TerminatedUploads chan FileInfo + // Metrics provides numbers of the usage for this handler. + Metrics Metrics } // NewUnroutedHandler creates a new handler without routing using the given @@ -104,6 +106,7 @@ func NewUnroutedHandler(config Config) (*UnroutedHandler, error) { TerminatedUploads: make(chan FileInfo), logger: config.Logger, extensions: extensions, + Metrics: newMetrics(), } return handler, nil @@ -123,7 +126,10 @@ func (handler *UnroutedHandler) Middleware(h http.Handler) http.Handler { r.Method = newMethod } - go handler.logger.Println(r.Method, r.URL.Path) + go func() { + handler.logger.Println(r.Method, r.URL.Path) + handler.Metrics.incRequestsTotal(r.Method) + }() header := w.Header() @@ -251,11 +257,15 @@ func (handler *UnroutedHandler) PostFile(w http.ResponseWriter, r *http.Request) info.ID = id handler.CompleteUploads <- info } + + go handler.Metrics.incUploadsFinished() } url := handler.absFileURL(r, id) w.Header().Set("Location", url) w.WriteHeader(http.StatusCreated) + + go handler.Metrics.incUploadsCreated() } // HeadFile returns the length and offset for the HEAD request @@ -388,6 +398,7 @@ func (handler *UnroutedHandler) PatchFile(w http.ResponseWriter, r *http.Request // Send new offset to client newOffset := offset + bytesWritten w.Header().Set("Upload-Offset", strconv.FormatInt(newOffset, 10)) + go handler.Metrics.incBytesReceived(uint64(bytesWritten)) // If the upload is completed, ... if newOffset == info.Size { @@ -404,6 +415,8 @@ func (handler *UnroutedHandler) PatchFile(w http.ResponseWriter, r *http.Request info.Offset = newOffset handler.CompleteUploads <- info } + + go handler.Metrics.incUploadsFinished() } w.WriteHeader(http.StatusNoContent) @@ -510,6 +523,8 @@ func (handler *UnroutedHandler) DelFile(w http.ResponseWriter, r *http.Request) if handler.config.NotifyTerminatedUploads { handler.TerminatedUploads <- info } + + go handler.Metrics.incUploadsTerminated() } // Send the error in the response body. The status code will be looked up in @@ -534,6 +549,8 @@ func (handler *UnroutedHandler) sendError(w http.ResponseWriter, r *http.Request w.Header().Set("Content-Length", strconv.Itoa(len(reason))) w.WriteHeader(status) w.Write([]byte(reason)) + + go handler.Metrics.incErrorsTotal(err) } // Make an absolute URLs to the given upload id. If the base path is absolute