diff --git a/pkg/s3store/s3store.go b/pkg/s3store/s3store.go index a44b59a..ee1db66 100644 --- a/pkg/s3store/s3store.go +++ b/pkg/s3store/s3store.go @@ -83,6 +83,7 @@ import ( "sync" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/tus/tusd/internal/semaphore" "github.com/tus/tusd/internal/uid" "github.com/tus/tusd/pkg/handler" @@ -162,8 +163,25 @@ type S3Store struct { // uploadSemaphore limits the number of concurrent multipart part uploads to S3. uploadSemaphore semaphore.Semaphore + + // requestDurationMetric holds the prometheus instance for storing the request durations. + requestDurationMetric *prometheus.SummaryVec } +// The labels to use for observing and storing request duration. One label per operation. +const ( + metricGetInfoObject = "get_info_object" + metricPutInfoObject = "put_info_object" + metricCreateMultipartUpload = "create_multipart_upload" + metricCompleteMultipartUpload = "complete_multipart_upload" + metricUploadPart = "upload_part" + metricListParts = "list_parts" + metricHeadPartObject = "head_part_object" + metricGetPartObject = "get_part_object" + metricPutPartObject = "put_part_object" + metricDeletePartObject = "delete_part_object" +) + type S3API interface { PutObjectWithContext(ctx context.Context, input *s3.PutObjectInput, opt ...request.Option) (*s3.PutObjectOutput, error) ListPartsWithContext(ctx context.Context, input *s3.ListPartsInput, opt ...request.Option) (*s3.ListPartsOutput, error) @@ -184,17 +202,26 @@ type s3APIForPresigning interface { // New constructs a new storage using the supplied bucket and service object. func New(bucket string, service S3API) S3Store { + requestDurationMetric := prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Name: "tusd_s3_request_duration_ms", + Help: "Duration of requests sent to S3 in milliseconds per operation", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + }, []string{"operation"}) + // TODO: Do not register them globally but to a specific prometheus handler + prometheus.MustRegister(requestDurationMetric) + return S3Store{ - Bucket: bucket, - Service: service, - MaxPartSize: 5 * 1024 * 1024 * 1024, - MinPartSize: 5 * 1024 * 1024, - PreferredPartSize: 50 * 1024 * 1024, - MaxMultipartParts: 10000, - MaxObjectSize: 5 * 1024 * 1024 * 1024 * 1024, - MaxBufferedParts: 20, - TemporaryDirectory: "", - uploadSemaphore: semaphore.New(10), + Bucket: bucket, + Service: service, + MaxPartSize: 5 * 1024 * 1024 * 1024, + MinPartSize: 5 * 1024 * 1024, + PreferredPartSize: 50 * 1024 * 1024, + MaxMultipartParts: 10000, + MaxObjectSize: 5 * 1024 * 1024 * 1024 * 1024, + MaxBufferedParts: 20, + TemporaryDirectory: "", + uploadSemaphore: semaphore.New(10), + requestDurationMetric: requestDurationMetric, } } @@ -212,6 +239,13 @@ func (store S3Store) UseIn(composer *handler.StoreComposer) { composer.UseLengthDeferrer(store) } +func (store S3Store) observeRequestDuration(start time.Time, label string) { + elapsed := time.Now().Sub(start) + ms := float64(elapsed.Nanoseconds() / int64(time.Millisecond)) + + store.requestDurationMetric.WithLabelValues(label).Observe(ms) +} + type s3Upload struct { id string store *S3Store @@ -258,11 +292,13 @@ func (store S3Store) NewUpload(ctx context.Context, info handler.FileInfo) (hand } // Create the actual multipart upload + t := time.Now() res, err := store.Service.CreateMultipartUploadWithContext(ctx, &s3.CreateMultipartUploadInput{ Bucket: aws.String(store.Bucket), Key: store.keyWithPrefix(uploadId), Metadata: metadata, }) + store.observeRequestDuration(t, metricCreateMultipartUpload) if err != nil { return nil, fmt.Errorf("s3store: unable to create multipart upload:\n%s", err) } @@ -315,12 +351,14 @@ func (upload *s3Upload) writeInfo(ctx context.Context, info handler.FileInfo) er } // Create object on S3 containing information about the file + t := time.Now() _, err = store.Service.PutObjectWithContext(ctx, &s3.PutObjectInput{ Bucket: aws.String(store.Bucket), Key: store.metadataKeyWithPrefix(uploadId + ".info"), Body: bytes.NewReader(infoJson), ContentLength: aws.Int64(int64(len(infoJson))), }) + store.observeRequestDuration(t, metricPutInfoObject) return err } @@ -421,6 +459,7 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re defer upload.store.uploadSemaphore.Release() defer wg.Done() + t := time.Now() uploadPartInput := &s3.UploadPartInput{ Bucket: aws.String(store.Bucket), Key: store.keyWithPrefix(uploadId), @@ -428,6 +467,7 @@ func (upload *s3Upload) uploadParts(ctx context.Context, offset int64, src io.Re PartNumber: aws.Int64(part.number), } etag, err := upload.putPartForUpload(ctx, uploadPartInput, file, part.size) + store.observeRequestDuration(t, metricUploadPart) if err != nil { uploadErr = err } else { @@ -558,6 +598,7 @@ func (upload s3Upload) fetchInfo(ctx context.Context) (info handler.FileInfo, pa go func() { defer wg.Done() + t := time.Now() // Get file info stored in separate object var res *s3.GetObjectOutput @@ -565,6 +606,7 @@ func (upload s3Upload) fetchInfo(ctx context.Context) (info handler.FileInfo, pa Bucket: aws.String(store.Bucket), Key: store.metadataKeyWithPrefix(uploadId + ".info"), }) + store.observeRequestDuration(t, metricGetInfoObject) if infoErr == nil { infoErr = json.NewDecoder(res.Body).Decode(&info) } @@ -780,6 +822,7 @@ func (upload s3Upload) FinishUpload(ctx context.Context) error { } } + t := time.Now() _, err = store.Service.CompleteMultipartUploadWithContext(ctx, &s3.CompleteMultipartUploadInput{ Bucket: aws.String(store.Bucket), Key: store.keyWithPrefix(uploadId), @@ -788,6 +831,7 @@ func (upload s3Upload) FinishUpload(ctx context.Context) error { Parts: completedParts, }, }) + store.observeRequestDuration(t, metricCompleteMultipartUpload) return err } @@ -942,6 +986,8 @@ func (store S3Store) listAllParts(ctx context.Context, id string) (parts []*s3Pa partMarker := int64(0) for { + t := time.Now() + // Get uploaded parts listPtr, err := store.Service.ListPartsWithContext(ctx, &s3.ListPartsInput{ Bucket: aws.String(store.Bucket), @@ -949,6 +995,7 @@ func (store S3Store) listAllParts(ctx context.Context, id string) (parts []*s3Pa UploadId: aws.String(multipartId), PartNumberMarker: aws.Int64(partMarker), }) + store.observeRequestDuration(t, metricListParts) if err != nil { return nil, err } @@ -972,6 +1019,7 @@ func (store S3Store) listAllParts(ctx context.Context, id string) (parts []*s3Pa } func (store S3Store) downloadIncompletePartForUpload(ctx context.Context, uploadId string) (*os.File, error) { + t := time.Now() incompleteUploadObject, err := store.getIncompletePartForUpload(ctx, uploadId) if err != nil { return nil, err @@ -988,6 +1036,7 @@ func (store S3Store) downloadIncompletePartForUpload(ctx context.Context, upload } n, err := io.Copy(partFile, incompleteUploadObject.Body) + store.observeRequestDuration(t, metricGetPartObject) if err != nil { return nil, err } @@ -1017,10 +1066,12 @@ func (store S3Store) getIncompletePartForUpload(ctx context.Context, uploadId st } func (store S3Store) headIncompletePartForUpload(ctx context.Context, uploadId string) (int64, error) { + t := time.Now() obj, err := store.Service.HeadObjectWithContext(ctx, &s3.HeadObjectInput{ Bucket: aws.String(store.Bucket), Key: store.metadataKeyWithPrefix(uploadId + ".part"), }) + store.observeRequestDuration(t, metricHeadPartObject) if err != nil { if isAwsError(err, s3.ErrCodeNoSuchKey) || isAwsError(err, "NotFound") || isAwsError(err, "AccessDenied") { @@ -1035,19 +1086,23 @@ func (store S3Store) headIncompletePartForUpload(ctx context.Context, uploadId s func (store S3Store) putIncompletePartForUpload(ctx context.Context, uploadId string, file *os.File) error { defer cleanUpTempFile(file) + t := time.Now() _, err := store.Service.PutObjectWithContext(ctx, &s3.PutObjectInput{ Bucket: aws.String(store.Bucket), Key: store.metadataKeyWithPrefix(uploadId + ".part"), Body: file, }) + store.observeRequestDuration(t, metricPutPartObject) return err } func (store S3Store) deleteIncompletePartForUpload(ctx context.Context, uploadId string) error { + t := time.Now() _, err := store.Service.DeleteObjectWithContext(ctx, &s3.DeleteObjectInput{ Bucket: aws.String(store.Bucket), Key: store.metadataKeyWithPrefix(uploadId + ".part"), }) + store.observeRequestDuration(t, metricPutPartObject) return err }