s3store: Expose metrics about request durations

This commit is contained in:
Marius 2021-05-24 12:00:20 +02:00
parent ce54ff8b1f
commit 0f24a80ea5
1 changed files with 65 additions and 10 deletions

View File

@ -83,6 +83,7 @@ import (
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/tus/tusd/internal/semaphore"
"github.com/tus/tusd/internal/uid"
"github.com/tus/tusd/pkg/handler"
@ -162,8 +163,25 @@ type S3Store struct {
// uploadSemaphore limits the number of concurrent multipart part uploads to S3.
uploadSemaphore semaphore.Semaphore
// requestDurationMetric holds the prometheus instance for storing the request durations.
requestDurationMetric *prometheus.SummaryVec
}
// The labels to use for observing and storing request duration. One label per operation.
const (
metricGetInfoObject = "get_info_object"
metricPutInfoObject = "put_info_object"
metricCreateMultipartUpload = "create_multipart_upload"
metricCompleteMultipartUpload = "complete_multipart_upload"
metricUploadPart = "upload_part"
metricListParts = "list_parts"
metricHeadPartObject = "head_part_object"
metricGetPartObject = "get_part_object"
metricPutPartObject = "put_part_object"
metricDeletePartObject = "delete_part_object"
)
type S3API interface {
PutObjectWithContext(ctx context.Context, input *s3.PutObjectInput, opt ...request.Option) (*s3.PutObjectOutput, error)
ListPartsWithContext(ctx context.Context, input *s3.ListPartsInput, opt ...request.Option) (*s3.ListPartsOutput, error)
@ -184,6 +202,14 @@ type s3APIForPresigning interface {
// New constructs a new storage using the supplied bucket and service object.
func New(bucket string, service S3API) S3Store {
requestDurationMetric := prometheus.NewSummaryVec(prometheus.SummaryOpts{
Name: "tusd_s3_request_duration_ms",
Help: "Duration of requests sent to S3 in milliseconds per operation",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
}, []string{"operation"})
// TODO: Do not register them globally but to a specific prometheus handler
prometheus.MustRegister(requestDurationMetric)
return S3Store{
Bucket: bucket,
Service: service,
@ -195,6 +221,7 @@ func New(bucket string, service S3API) S3Store {
MaxBufferedParts: 20,
TemporaryDirectory: "",
uploadSemaphore: semaphore.New(10),
requestDurationMetric: requestDurationMetric,
}
}
@ -212,6 +239,13 @@ func (store S3Store) UseIn(composer *handler.StoreComposer) {
composer.UseLengthDeferrer(store)
}
func (store S3Store) observeRequestDuration(start time.Time, label string) {
elapsed := time.Now().Sub(start)
ms := float64(elapsed.Nanoseconds() / int64(time.Millisecond))
store.requestDurationMetric.WithLabelValues(label).Observe(ms)
}
type s3Upload struct {
id string
store *S3Store
@ -258,11 +292,13 @@ func (store S3Store) NewUpload(ctx context.Context, info handler.FileInfo) (hand
}
// Create the actual multipart upload
t := time.Now()
res, err := store.Service.CreateMultipartUploadWithContext(ctx, &s3.CreateMultipartUploadInput{
Bucket: aws.String(store.Bucket),
Key: store.keyWithPrefix(uploadId),
Metadata: metadata,
})
store.observeRequestDuration(t, metricCreateMultipartUpload)
if err != nil {
return nil, fmt.Errorf("s3store: unable to create multipart upload:\n%s", err)
}
@ -315,12 +351,14 @@ func (upload *s3Upload) writeInfo(ctx context.Context, info handler.FileInfo) er
}
// Create object on S3 containing information about the file
t := time.Now()
_, err = store.Service.PutObjectWithContext(ctx, &s3.PutObjectInput{
Bucket: aws.String(store.Bucket),
Key: store.metadataKeyWithPrefix(uploadId + ".info"),
Body: bytes.NewReader(infoJson),
ContentLength: aws.Int64(int64(len(infoJson))),
})
store.observeRequestDuration(t, metricPutInfoObject)
return err
}
@ -421,6 +459,7 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re
defer upload.store.uploadSemaphore.Release()
defer wg.Done()
t := time.Now()
uploadPartInput := &s3.UploadPartInput{
Bucket: aws.String(store.Bucket),
Key: store.keyWithPrefix(uploadId),
@ -428,6 +467,7 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re
PartNumber: aws.Int64(part.number),
}
etag, err := upload.putPartForUpload(ctx, uploadPartInput, file, part.size)
store.observeRequestDuration(t, metricUploadPart)
if err != nil {
uploadErr = err
} else {
@ -558,6 +598,7 @@ func (upload s3Upload) fetchInfo(ctx context.Context) (info handler.FileInfo, pa
go func() {
defer wg.Done()
t := time.Now()
// Get file info stored in separate object
var res *s3.GetObjectOutput
@ -565,6 +606,7 @@ func (upload s3Upload) fetchInfo(ctx context.Context) (info handler.FileInfo, pa
Bucket: aws.String(store.Bucket),
Key: store.metadataKeyWithPrefix(uploadId + ".info"),
})
store.observeRequestDuration(t, metricGetInfoObject)
if infoErr == nil {
infoErr = json.NewDecoder(res.Body).Decode(&info)
}
@ -780,6 +822,7 @@ func (upload s3Upload) FinishUpload(ctx context.Context) error {
}
}
t := time.Now()
_, err = store.Service.CompleteMultipartUploadWithContext(ctx, &s3.CompleteMultipartUploadInput{
Bucket: aws.String(store.Bucket),
Key: store.keyWithPrefix(uploadId),
@ -788,6 +831,7 @@ func (upload s3Upload) FinishUpload(ctx context.Context) error {
Parts: completedParts,
},
})
store.observeRequestDuration(t, metricCompleteMultipartUpload)
return err
}
@ -942,6 +986,8 @@ func (store S3Store) listAllParts(ctx context.Context, id string) (parts []*s3Pa
partMarker := int64(0)
for {
t := time.Now()
// Get uploaded parts
listPtr, err := store.Service.ListPartsWithContext(ctx, &s3.ListPartsInput{
Bucket: aws.String(store.Bucket),
@ -949,6 +995,7 @@ func (store S3Store) listAllParts(ctx context.Context, id string) (parts []*s3Pa
UploadId: aws.String(multipartId),
PartNumberMarker: aws.Int64(partMarker),
})
store.observeRequestDuration(t, metricListParts)
if err != nil {
return nil, err
}
@ -972,6 +1019,7 @@ func (store S3Store) listAllParts(ctx context.Context, id string) (parts []*s3Pa
}
func (store S3Store) downloadIncompletePartForUpload(ctx context.Context, uploadId string) (*os.File, error) {
t := time.Now()
incompleteUploadObject, err := store.getIncompletePartForUpload(ctx, uploadId)
if err != nil {
return nil, err
@ -988,6 +1036,7 @@ func (store S3Store) downloadIncompletePartForUpload(ctx context.Context, upload
}
n, err := io.Copy(partFile, incompleteUploadObject.Body)
store.observeRequestDuration(t, metricGetPartObject)
if err != nil {
return nil, err
}
@ -1017,10 +1066,12 @@ func (store S3Store) getIncompletePartForUpload(ctx context.Context, uploadId st
}
func (store S3Store) headIncompletePartForUpload(ctx context.Context, uploadId string) (int64, error) {
t := time.Now()
obj, err := store.Service.HeadObjectWithContext(ctx, &s3.HeadObjectInput{
Bucket: aws.String(store.Bucket),
Key: store.metadataKeyWithPrefix(uploadId + ".part"),
})
store.observeRequestDuration(t, metricHeadPartObject)
if err != nil {
if isAwsError(err, s3.ErrCodeNoSuchKey) || isAwsError(err, "NotFound") || isAwsError(err, "AccessDenied") {
@ -1035,19 +1086,23 @@ func (store S3Store) headIncompletePartForUpload(ctx context.Context, uploadId s
func (store S3Store) putIncompletePartForUpload(ctx context.Context, uploadId string, file *os.File) error {
defer cleanUpTempFile(file)
t := time.Now()
_, err := store.Service.PutObjectWithContext(ctx, &s3.PutObjectInput{
Bucket: aws.String(store.Bucket),
Key: store.metadataKeyWithPrefix(uploadId + ".part"),
Body: file,
})
store.observeRequestDuration(t, metricPutPartObject)
return err
}
func (store S3Store) deleteIncompletePartForUpload(ctx context.Context, uploadId string) error {
t := time.Now()
_, err := store.Service.DeleteObjectWithContext(ctx, &s3.DeleteObjectInput{
Bucket: aws.String(store.Bucket),
Key: store.metadataKeyWithPrefix(uploadId + ".part"),
})
store.observeRequestDuration(t, metricPutPartObject)
return err
}