Allow exposing metrics for Prometheus and Co.

This commit is contained in:
Marius 2016-05-24 17:04:28 +02:00
parent f5c790f792
commit c80c8b1962
4 changed files with 238 additions and 1 deletions

View File

@ -18,12 +18,15 @@ import (
"github.com/tus/tusd/filestore" "github.com/tus/tusd/filestore"
"github.com/tus/tusd/limitedstore" "github.com/tus/tusd/limitedstore"
"github.com/tus/tusd/memorylocker" "github.com/tus/tusd/memorylocker"
"github.com/tus/tusd/prometheuscollector"
"github.com/tus/tusd/s3store" "github.com/tus/tusd/s3store"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3"
"github.com/prometheus/client_golang/prometheus"
) )
var VersionName = "n/a" var VersionName = "n/a"
@ -40,6 +43,7 @@ var timeout int64
var s3Bucket string var s3Bucket string
var hooksDir string var hooksDir string
var version bool var version bool
var exposeMetrics bool
var stdout = log.New(os.Stdout, "[tusd] ", 0) var stdout = log.New(os.Stdout, "[tusd] ", 0)
var stderr = log.New(os.Stderr, "[tusd] ", 0) var stderr = log.New(os.Stderr, "[tusd] ", 0)
@ -48,6 +52,11 @@ var hookInstalled bool
var greeting string var greeting string
var openConnections = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "connections_open_total",
Help: "Current number of open connections.",
})
func init() { func init() {
flag.StringVar(&httpHost, "host", "0.0.0.0", "Host to bind HTTP server to") flag.StringVar(&httpHost, "host", "0.0.0.0", "Host to bind HTTP server to")
flag.StringVar(&httpPort, "port", "1080", "Port to bind HTTP server to") flag.StringVar(&httpPort, "port", "1080", "Port to bind HTTP server to")
@ -59,6 +68,7 @@ func init() {
flag.StringVar(&s3Bucket, "s3-bucket", "", "Use AWS S3 with this bucket as storage backend (requires the AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_REGION environment variables to be set)") flag.StringVar(&s3Bucket, "s3-bucket", "", "Use AWS S3 with this bucket as storage backend (requires the AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_REGION environment variables to be set)")
flag.StringVar(&hooksDir, "hooks-dir", "", "") flag.StringVar(&hooksDir, "hooks-dir", "", "")
flag.BoolVar(&version, "version", false, "Print tusd version information") flag.BoolVar(&version, "version", false, "Print tusd version information")
flag.BoolVar(&exposeMetrics, "expose-metrics", true, "Expose metrics about tusd usage")
flag.Parse() flag.Parse()
@ -160,6 +170,13 @@ func main() {
} }
}() }()
if exposeMetrics {
prometheus.MustRegister(openConnections)
prometheus.MustRegister(prometheuscollector.New(handler.Metrics))
http.Handle("/metrics", prometheus.Handler())
}
// Do not display the greeting if the tusd handler will be mounted at the root // Do not display the greeting if the tusd handler will be mounted at the root
// path. Else this would cause a "multiple registrations for /" panic. // path. Else this would cause a "multiple registrations for /" panic.
if basepath != "/" { if basepath != "/" {
@ -238,6 +255,9 @@ func (l *Listener) Accept() (net.Conn, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
go openConnections.Inc()
tc := &Conn{ tc := &Conn{
Conn: c, Conn: c,
ReadTimeout: l.ReadTimeout, ReadTimeout: l.ReadTimeout,
@ -284,6 +304,11 @@ func (c *Conn) Write(b []byte) (int, error) {
return c.Conn.Write(b) return c.Conn.Write(b)
} }
func (c *Conn) Close() error {
go openConnections.Dec()
return c.Conn.Close()
}
func NewListener(addr string, readTimeout, writeTimeout time.Duration) (net.Listener, error) { func NewListener(addr string, readTimeout, writeTimeout time.Duration) (net.Listener, error) {
l, err := net.Listen("tcp", addr) l, err := net.Listen("tcp", addr)
if err != nil { if err != nil {

87
metrics.go Normal file
View File

@ -0,0 +1,87 @@
package tusd
import (
"sync/atomic"
)
// Metrics provides numbers about the usage of the tusd handler. Since these may
// be accessed from multiple goroutines, it is necessary to read and modify them
// atomically using the functions exposed in the sync/atomic package, such as
// atomic.LoadUint64. In addition the maps must not be modified to prevent data
// races.
type Metrics struct {
// RequestTotal counts the number of incoming requests per method
RequestsTotal map[string]*uint64
// ErrorsTotal counts the number of returned errors by their message
ErrorsTotal map[string]*uint64
BytesReceived *uint64
UploadsFinished *uint64
UploadsCreated *uint64
UploadsTerminated *uint64
}
// incRequestsTotal increases the counter for this request method atomically by
// one. The method must be one of GET, HEAD, POST, PATCH, DELETE.
func (m Metrics) incRequestsTotal(method string) {
atomic.AddUint64(m.RequestsTotal[method], 1)
}
// incErrorsTotal increases the counter for this error atomically by one.
func (m Metrics) incErrorsTotal(err error) {
msg := err.Error()
if _, ok := ErrStatusCodes[err]; !ok {
msg = "system error"
}
atomic.AddUint64(m.ErrorsTotal[msg], 1)
}
// incBytesReceived increases the number of received bytes atomically be the
// specified number.
func (m Metrics) incBytesReceived(delta uint64) {
atomic.AddUint64(m.BytesReceived, delta)
}
// incUploadsFinished increases the counter for finished uploads atomically by one.
func (m Metrics) incUploadsFinished() {
atomic.AddUint64(m.UploadsFinished, 1)
}
// incUploadsCreated increases the counter for completed uploads atomically by one.
func (m Metrics) incUploadsCreated() {
atomic.AddUint64(m.UploadsCreated, 1)
}
// incUploadsTerminated increases the counter for completed uploads atomically by one.
func (m Metrics) incUploadsTerminated() {
atomic.AddUint64(m.UploadsTerminated, 1)
}
func newMetrics() Metrics {
return Metrics{
RequestsTotal: map[string]*uint64{
"GET": new(uint64),
"HEAD": new(uint64),
"POST": new(uint64),
"PATCH": new(uint64),
"DELETE": new(uint64),
},
ErrorsTotal: newErrorsTotalMap(),
BytesReceived: new(uint64),
UploadsFinished: new(uint64),
UploadsCreated: new(uint64),
UploadsTerminated: new(uint64),
}
}
func newErrorsTotalMap() map[string]*uint64 {
m := make(map[string]*uint64, len(ErrStatusCodes)+1)
for err, _ := range ErrStatusCodes {
m[err.Error()] = new(uint64)
}
m["system error"] = new(uint64)
return m
}

View File

@ -0,0 +1,108 @@
// package prometheuscollector allows to expose metrics for Prometheus.
//
// Using the provided collector, you can easily expose metrics for tusd in the
// Prometheus exposition format (https://prometheus.io/docs/instrumenting/exposition_formats/):
//
// handler, err := tusd.NewHandler(…)
// collector := prometheuscollector.New(handler.Metrics)
// prometheus.MustRegister(collector)
package prometheuscollector
import (
"sync/atomic"
"github.com/tus/tusd"
"github.com/prometheus/client_golang/prometheus"
)
var (
requestsTotalDesc = prometheus.NewDesc(
"tusd_requests_total",
"Total number of requests served by tusd per method.",
[]string{"method"}, nil)
errorsTotalDesc = prometheus.NewDesc(
"tusd_errors_total",
"Total number of erorrs per cause.",
[]string{"cause"}, nil)
bytesReceivedDesc = prometheus.NewDesc(
"tusd_bytes_received",
"Number of bytes received for uploads.",
nil, nil)
uploadsCreatedDesc = prometheus.NewDesc(
"tusd_uploads_created",
"Number of created uploads.",
nil, nil)
uploadsFinishedDesc = prometheus.NewDesc(
"tusd_uploads_finished",
"Number of finished uploads.",
nil, nil)
uploadsTerminatedDesc = prometheus.NewDesc(
"tusd_uploads_terminated",
"Number of terminted uploads.",
nil, nil)
)
type Collector struct {
metrics tusd.Metrics
}
// New creates a new collector which read froms the provided Metrics struct.
func New(metrics tusd.Metrics) Collector {
return Collector{
metrics: metrics,
}
}
func (_ Collector) Describe(descs chan<- *prometheus.Desc) {
descs <- requestsTotalDesc
descs <- errorsTotalDesc
descs <- bytesReceivedDesc
descs <- uploadsCreatedDesc
descs <- uploadsFinishedDesc
descs <- uploadsTerminatedDesc
}
func (c Collector) Collect(metrics chan<- prometheus.Metric) {
for method, valuePtr := range c.metrics.RequestsTotal {
metrics <- prometheus.MustNewConstMetric(
requestsTotalDesc,
prometheus.GaugeValue,
float64(atomic.LoadUint64(valuePtr)),
method,
)
}
for error, valuePtr := range c.metrics.ErrorsTotal {
metrics <- prometheus.MustNewConstMetric(
errorsTotalDesc,
prometheus.GaugeValue,
float64(atomic.LoadUint64(valuePtr)),
error,
)
}
metrics <- prometheus.MustNewConstMetric(
bytesReceivedDesc,
prometheus.CounterValue,
float64(atomic.LoadUint64(c.metrics.BytesReceived)),
)
metrics <- prometheus.MustNewConstMetric(
uploadsFinishedDesc,
prometheus.CounterValue,
float64(atomic.LoadUint64(c.metrics.UploadsFinished)),
)
metrics <- prometheus.MustNewConstMetric(
uploadsCreatedDesc,
prometheus.CounterValue,
float64(atomic.LoadUint64(c.metrics.UploadsCreated)),
)
metrics <- prometheus.MustNewConstMetric(
uploadsTerminatedDesc,
prometheus.CounterValue,
float64(atomic.LoadUint64(c.metrics.UploadsTerminated)),
)
}

View File

@ -75,6 +75,8 @@ type UnroutedHandler struct {
// happen if the NotifyTerminatedUploads field is set to true in the Config // happen if the NotifyTerminatedUploads field is set to true in the Config
// structure. // structure.
TerminatedUploads chan FileInfo TerminatedUploads chan FileInfo
// Metrics provides numbers of the usage for this handler.
Metrics Metrics
} }
// NewUnroutedHandler creates a new handler without routing using the given // NewUnroutedHandler creates a new handler without routing using the given
@ -104,6 +106,7 @@ func NewUnroutedHandler(config Config) (*UnroutedHandler, error) {
TerminatedUploads: make(chan FileInfo), TerminatedUploads: make(chan FileInfo),
logger: config.Logger, logger: config.Logger,
extensions: extensions, extensions: extensions,
Metrics: newMetrics(),
} }
return handler, nil return handler, nil
@ -123,7 +126,10 @@ func (handler *UnroutedHandler) Middleware(h http.Handler) http.Handler {
r.Method = newMethod r.Method = newMethod
} }
go handler.logger.Println(r.Method, r.URL.Path) go func() {
handler.logger.Println(r.Method, r.URL.Path)
handler.Metrics.incRequestsTotal(r.Method)
}()
header := w.Header() header := w.Header()
@ -251,11 +257,15 @@ func (handler *UnroutedHandler) PostFile(w http.ResponseWriter, r *http.Request)
info.ID = id info.ID = id
handler.CompleteUploads <- info handler.CompleteUploads <- info
} }
go handler.Metrics.incUploadsFinished()
} }
url := handler.absFileURL(r, id) url := handler.absFileURL(r, id)
w.Header().Set("Location", url) w.Header().Set("Location", url)
w.WriteHeader(http.StatusCreated) w.WriteHeader(http.StatusCreated)
go handler.Metrics.incUploadsCreated()
} }
// HeadFile returns the length and offset for the HEAD request // HeadFile returns the length and offset for the HEAD request
@ -388,6 +398,7 @@ func (handler *UnroutedHandler) PatchFile(w http.ResponseWriter, r *http.Request
// Send new offset to client // Send new offset to client
newOffset := offset + bytesWritten newOffset := offset + bytesWritten
w.Header().Set("Upload-Offset", strconv.FormatInt(newOffset, 10)) w.Header().Set("Upload-Offset", strconv.FormatInt(newOffset, 10))
go handler.Metrics.incBytesReceived(uint64(bytesWritten))
// If the upload is completed, ... // If the upload is completed, ...
if newOffset == info.Size { if newOffset == info.Size {
@ -404,6 +415,8 @@ func (handler *UnroutedHandler) PatchFile(w http.ResponseWriter, r *http.Request
info.Offset = newOffset info.Offset = newOffset
handler.CompleteUploads <- info handler.CompleteUploads <- info
} }
go handler.Metrics.incUploadsFinished()
} }
w.WriteHeader(http.StatusNoContent) w.WriteHeader(http.StatusNoContent)
@ -510,6 +523,8 @@ func (handler *UnroutedHandler) DelFile(w http.ResponseWriter, r *http.Request)
if handler.config.NotifyTerminatedUploads { if handler.config.NotifyTerminatedUploads {
handler.TerminatedUploads <- info handler.TerminatedUploads <- info
} }
go handler.Metrics.incUploadsTerminated()
} }
// Send the error in the response body. The status code will be looked up in // Send the error in the response body. The status code will be looked up in
@ -534,6 +549,8 @@ func (handler *UnroutedHandler) sendError(w http.ResponseWriter, r *http.Request
w.Header().Set("Content-Length", strconv.Itoa(len(reason))) w.Header().Set("Content-Length", strconv.Itoa(len(reason)))
w.WriteHeader(status) w.WriteHeader(status)
w.Write([]byte(reason)) w.Write([]byte(reason))
go handler.Metrics.incErrorsTotal(err)
} }
// Make an absolute URLs to the given upload id. If the base path is absolute // Make an absolute URLs to the given upload id. If the base path is absolute