parent
c1eddef26a
commit
b2273d4153
|
@ -1,7 +1,6 @@
|
||||||
package handler
|
package handler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
)
|
)
|
||||||
|
@ -78,23 +77,16 @@ func newMetrics() Metrics {
|
||||||
// ErrorsTotalMap stores the counters for the different HTTP errors.
|
// ErrorsTotalMap stores the counters for the different HTTP errors.
|
||||||
type ErrorsTotalMap struct {
|
type ErrorsTotalMap struct {
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
counter map[simpleHTTPError]*uint64
|
counter map[ErrorsTotalMapEntry]*uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
type simpleHTTPError struct {
|
type ErrorsTotalMapEntry struct {
|
||||||
Message string
|
ErrorCode string
|
||||||
StatusCode int
|
StatusCode int
|
||||||
}
|
}
|
||||||
|
|
||||||
func simplifyHTTPError(err HTTPError) simpleHTTPError {
|
|
||||||
return simpleHTTPError{
|
|
||||||
Message: err.Error(),
|
|
||||||
StatusCode: err.StatusCode(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func newErrorsTotalMap() *ErrorsTotalMap {
|
func newErrorsTotalMap() *ErrorsTotalMap {
|
||||||
m := make(map[simpleHTTPError]*uint64, 20)
|
m := make(map[ErrorsTotalMapEntry]*uint64, 20)
|
||||||
return &ErrorsTotalMap{
|
return &ErrorsTotalMap{
|
||||||
counter: m,
|
counter: m,
|
||||||
}
|
}
|
||||||
|
@ -103,7 +95,11 @@ func newErrorsTotalMap() *ErrorsTotalMap {
|
||||||
// retrievePointerFor returns (after creating it if necessary) the pointer to
|
// retrievePointerFor returns (after creating it if necessary) the pointer to
|
||||||
// the counter for the error.
|
// the counter for the error.
|
||||||
func (e *ErrorsTotalMap) retrievePointerFor(err HTTPError) *uint64 {
|
func (e *ErrorsTotalMap) retrievePointerFor(err HTTPError) *uint64 {
|
||||||
serr := simplifyHTTPError(err)
|
serr := ErrorsTotalMapEntry{
|
||||||
|
ErrorCode: err.ErrorCode(),
|
||||||
|
StatusCode: err.StatusCode(),
|
||||||
|
}
|
||||||
|
|
||||||
e.lock.RLock()
|
e.lock.RLock()
|
||||||
ptr, ok := e.counter[serr]
|
ptr, ok := e.counter[serr]
|
||||||
e.lock.RUnlock()
|
e.lock.RUnlock()
|
||||||
|
@ -124,12 +120,11 @@ func (e *ErrorsTotalMap) retrievePointerFor(err HTTPError) *uint64 {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load retrieves the map of the counter pointers atomically
|
// Load retrieves the map of the counter pointers atomically
|
||||||
func (e *ErrorsTotalMap) Load() map[HTTPError]*uint64 {
|
func (e *ErrorsTotalMap) Load() map[ErrorsTotalMapEntry]*uint64 {
|
||||||
m := make(map[HTTPError]*uint64, len(e.counter))
|
m := make(map[ErrorsTotalMapEntry]*uint64, len(e.counter))
|
||||||
e.lock.RLock()
|
e.lock.RLock()
|
||||||
for err, ptr := range e.counter {
|
for err, ptr := range e.counter {
|
||||||
httpErr := NewHTTPError(errors.New(err.Message), err.StatusCode)
|
m[err] = ptr
|
||||||
m[httpErr] = ptr
|
|
||||||
}
|
}
|
||||||
e.lock.RUnlock()
|
e.lock.RUnlock()
|
||||||
|
|
||||||
|
|
|
@ -629,7 +629,7 @@ func TestPatch(t *testing.T) {
|
||||||
ResHeader: map[string]string{
|
ResHeader: map[string]string{
|
||||||
"Upload-Offset": "",
|
"Upload-Offset": "",
|
||||||
},
|
},
|
||||||
ResBody: "upload has been stopped by server\n",
|
ResBody: "ERR_UPLOAD_STOPPED: upload has been stopped by server\n",
|
||||||
}).Run(handler, t)
|
}).Run(handler, t)
|
||||||
|
|
||||||
_, more := <-c
|
_, more := <-c
|
||||||
|
@ -680,7 +680,7 @@ func TestPatch(t *testing.T) {
|
||||||
ResHeader: map[string]string{
|
ResHeader: map[string]string{
|
||||||
"Upload-Offset": "",
|
"Upload-Offset": "",
|
||||||
},
|
},
|
||||||
ResBody: "an error while reading the body\n",
|
ResBody: "ERR_INTERNAL_SERVER_ERROR: an error while reading the body\n",
|
||||||
}).Run(handler, t)
|
}).Run(handler, t)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,7 +3,6 @@ package handler
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"errors"
|
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
"math"
|
"math"
|
||||||
|
@ -29,19 +28,29 @@ var (
|
||||||
// See the net/http package for standardized status codes.
|
// See the net/http package for standardized status codes.
|
||||||
type HTTPError interface {
|
type HTTPError interface {
|
||||||
error
|
error
|
||||||
|
ErrorCode() string
|
||||||
StatusCode() int
|
StatusCode() int
|
||||||
Body() []byte
|
Body() []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
type httpError struct {
|
type httpError struct {
|
||||||
error
|
errorCode string
|
||||||
|
message string
|
||||||
statusCode int
|
statusCode int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (err httpError) Error() string {
|
||||||
|
return err.errorCode + ": " + err.message
|
||||||
|
}
|
||||||
|
|
||||||
func (err httpError) StatusCode() int {
|
func (err httpError) StatusCode() int {
|
||||||
return err.statusCode
|
return err.statusCode
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (err httpError) ErrorCode() string {
|
||||||
|
return err.errorCode
|
||||||
|
}
|
||||||
|
|
||||||
func (err httpError) Body() []byte {
|
func (err httpError) Body() []byte {
|
||||||
return []byte(err.Error())
|
return []byte(err.Error())
|
||||||
}
|
}
|
||||||
|
@ -49,30 +58,32 @@ func (err httpError) Body() []byte {
|
||||||
// NewHTTPError adds the given status code to the provided error and returns
|
// NewHTTPError adds the given status code to the provided error and returns
|
||||||
// the new error instance. The status code may be used in corresponding HTTP
|
// the new error instance. The status code may be used in corresponding HTTP
|
||||||
// responses. See the net/http package for standardized status codes.
|
// responses. See the net/http package for standardized status codes.
|
||||||
func NewHTTPError(err error, statusCode int) HTTPError {
|
func NewHTTPError(errCode string, message string, statusCode int) HTTPError {
|
||||||
return httpError{err, statusCode}
|
return httpError{errCode, message, statusCode}
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ErrUnsupportedVersion = NewHTTPError(errors.New("unsupported version"), http.StatusPreconditionFailed)
|
ErrUnsupportedVersion = NewHTTPError("ERR_UNSUPPORTED_VERSION", "missing, invalid or unsupported Tus-Resumable header", http.StatusPreconditionFailed)
|
||||||
ErrMaxSizeExceeded = NewHTTPError(errors.New("maximum size exceeded"), http.StatusRequestEntityTooLarge)
|
ErrMaxSizeExceeded = NewHTTPError("ERR_MAX_SIZE_EXCEEDED", "maximum size exceeded", http.StatusRequestEntityTooLarge)
|
||||||
ErrInvalidContentType = NewHTTPError(errors.New("missing or invalid Content-Type header"), http.StatusBadRequest)
|
ErrInvalidContentType = NewHTTPError("ERR_INVALID_CONTENT_TYPE", "missing or invalid Content-Type header", http.StatusBadRequest)
|
||||||
ErrInvalidUploadLength = NewHTTPError(errors.New("missing or invalid Upload-Length header"), http.StatusBadRequest)
|
ErrInvalidUploadLength = NewHTTPError("ERR_INVALID_UPLOAD_LENGTH", "missing or invalid Upload-Length header", http.StatusBadRequest)
|
||||||
ErrInvalidOffset = NewHTTPError(errors.New("missing or invalid Upload-Offset header"), http.StatusBadRequest)
|
ErrInvalidOffset = NewHTTPError("ERR_INVALID_OFFSET", "missing or invalid Upload-Offset header", http.StatusBadRequest)
|
||||||
ErrNotFound = NewHTTPError(errors.New("upload not found"), http.StatusNotFound)
|
ErrNotFound = NewHTTPError("ERR_UPLOAD_NOT_FOUND", "upload not found", http.StatusNotFound)
|
||||||
ErrFileLocked = NewHTTPError(errors.New("file currently locked"), http.StatusLocked)
|
ErrFileLocked = NewHTTPError("ERR_UPLOAD_LOCKED", "file currently locked", http.StatusLocked)
|
||||||
ErrMismatchOffset = NewHTTPError(errors.New("mismatched offset"), http.StatusConflict)
|
ErrMismatchOffset = NewHTTPError("ERR_MISMATCHED_OFFSET", "mismatched offset", http.StatusConflict)
|
||||||
ErrSizeExceeded = NewHTTPError(errors.New("resource's size exceeded"), http.StatusRequestEntityTooLarge)
|
ErrSizeExceeded = NewHTTPError("ERR_UPLOAD_SIZE_EXCEEDED", "upload's size exceeded", http.StatusRequestEntityTooLarge)
|
||||||
ErrNotImplemented = NewHTTPError(errors.New("feature not implemented"), http.StatusNotImplemented)
|
ErrNotImplemented = NewHTTPError("ERR_NOT_IMPLEMENTED", "feature not implemented", http.StatusNotImplemented)
|
||||||
ErrUploadNotFinished = NewHTTPError(errors.New("one of the partial uploads is not finished"), http.StatusBadRequest)
|
ErrUploadNotFinished = NewHTTPError("ERR_UPLOAD_NOT_FINISHED", "one of the partial uploads is not finished", http.StatusBadRequest)
|
||||||
ErrInvalidConcat = NewHTTPError(errors.New("invalid Upload-Concat header"), http.StatusBadRequest)
|
ErrInvalidConcat = NewHTTPError("ERR_INVALID_CONCAT", "invalid Upload-Concat header", http.StatusBadRequest)
|
||||||
ErrModifyFinal = NewHTTPError(errors.New("modifying a final upload is not allowed"), http.StatusForbidden)
|
ErrModifyFinal = NewHTTPError("ERR_MODIFY_FINAL", "modifying a final upload is not allowed", http.StatusForbidden)
|
||||||
ErrUploadLengthAndUploadDeferLength = NewHTTPError(errors.New("provided both Upload-Length and Upload-Defer-Length"), http.StatusBadRequest)
|
ErrUploadLengthAndUploadDeferLength = NewHTTPError("ERR_AMBIGUOUS_UPLOAD_LENGTH", "provided both Upload-Length and Upload-Defer-Length", http.StatusBadRequest)
|
||||||
ErrInvalidUploadDeferLength = NewHTTPError(errors.New("invalid Upload-Defer-Length header"), http.StatusBadRequest)
|
ErrInvalidUploadDeferLength = NewHTTPError("ERR_INVALID_UPLOAD_LENGTH_DEFER", "invalid Upload-Defer-Length header", http.StatusBadRequest)
|
||||||
ErrUploadStoppedByServer = NewHTTPError(errors.New("upload has been stopped by server"), http.StatusBadRequest)
|
ErrUploadStoppedByServer = NewHTTPError("ERR_UPLOAD_STOPPED", "upload has been stopped by server", http.StatusBadRequest)
|
||||||
|
|
||||||
errReadTimeout = errors.New("read tcp: i/o timeout")
|
// TODO: These two responses are 500 for backwards compatability. We should discuss
|
||||||
errConnectionReset = errors.New("read tcp: connection reset by peer")
|
// whether it is better to more them to 4XX status codes.
|
||||||
|
ErrReadTimeout = NewHTTPError("ERR_READ_TIMEOUT", "timeout while reading request body", http.StatusInternalServerError)
|
||||||
|
ErrConnectionReset = NewHTTPError("ERR_CONNECTION_RESET", "TCP connection reset by peer", http.StatusInternalServerError)
|
||||||
)
|
)
|
||||||
|
|
||||||
// HTTPRequest contains basic details of an incoming HTTP request.
|
// HTTPRequest contains basic details of an incoming HTTP request.
|
||||||
|
@ -910,14 +921,14 @@ func (handler *UnroutedHandler) sendError(w http.ResponseWriter, r *http.Request
|
||||||
// message looks like: read tcp 127.0.0.1:1080->127.0.0.1:53673: i/o timeout
|
// message looks like: read tcp 127.0.0.1:1080->127.0.0.1:53673: i/o timeout
|
||||||
// Therefore, we use a common error message for all of them.
|
// Therefore, we use a common error message for all of them.
|
||||||
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
|
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
|
||||||
err = errReadTimeout
|
err = ErrReadTimeout
|
||||||
}
|
}
|
||||||
|
|
||||||
// Errors for connnection resets also contain TCP details, we don't need, e.g:
|
// Errors for connnection resets also contain TCP details, we don't need, e.g:
|
||||||
// read tcp 127.0.0.1:1080->127.0.0.1:10023: read: connection reset by peer
|
// read tcp 127.0.0.1:1080->127.0.0.1:10023: read: connection reset by peer
|
||||||
// Therefore, we also trim those down.
|
// Therefore, we also trim those down.
|
||||||
if strings.HasSuffix(err.Error(), "read: connection reset by peer") {
|
if strings.HasSuffix(err.Error(), "read: connection reset by peer") {
|
||||||
err = errConnectionReset
|
err = ErrConnectionReset
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Decide if we should handle this in here, in body_reader or not at all.
|
// TODO: Decide if we should handle this in here, in body_reader or not at all.
|
||||||
|
@ -941,7 +952,8 @@ func (handler *UnroutedHandler) sendError(w http.ResponseWriter, r *http.Request
|
||||||
|
|
||||||
statusErr, ok := err.(HTTPError)
|
statusErr, ok := err.(HTTPError)
|
||||||
if !ok {
|
if !ok {
|
||||||
statusErr = NewHTTPError(err, http.StatusInternalServerError)
|
handler.log("InternalServerError", "message", err.Error(), "method", r.Method, "path", r.URL.Path, "requestId", getRequestId(r))
|
||||||
|
statusErr = NewHTTPError("ERR_INTERNAL_SERVER_ERROR", err.Error(), http.StatusInternalServerError)
|
||||||
}
|
}
|
||||||
|
|
||||||
reason := append(statusErr.Body(), '\n')
|
reason := append(statusErr.Body(), '\n')
|
||||||
|
@ -949,12 +961,13 @@ func (handler *UnroutedHandler) sendError(w http.ResponseWriter, r *http.Request
|
||||||
reason = nil
|
reason = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: Allow JSON response
|
||||||
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
|
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
|
||||||
w.Header().Set("Content-Length", strconv.Itoa(len(reason)))
|
w.Header().Set("Content-Length", strconv.Itoa(len(reason)))
|
||||||
w.WriteHeader(statusErr.StatusCode())
|
w.WriteHeader(statusErr.StatusCode())
|
||||||
w.Write(reason)
|
w.Write(reason)
|
||||||
|
|
||||||
handler.log("ResponseOutgoing", "status", strconv.Itoa(statusErr.StatusCode()), "method", r.Method, "path", r.URL.Path, "error", err.Error(), "requestId", getRequestId(r))
|
handler.log("ResponseOutgoing", "status", strconv.Itoa(statusErr.StatusCode()), "method", r.Method, "path", r.URL.Path, "error", statusErr.ErrorCode(), "requestId", getRequestId(r))
|
||||||
|
|
||||||
handler.Metrics.incErrorsTotal(statusErr)
|
handler.Metrics.incErrorsTotal(statusErr)
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,7 +25,7 @@ var (
|
||||||
errorsTotalDesc = prometheus.NewDesc(
|
errorsTotalDesc = prometheus.NewDesc(
|
||||||
"tusd_errors_total",
|
"tusd_errors_total",
|
||||||
"Total number of errors per status.",
|
"Total number of errors per status.",
|
||||||
[]string{"status", "message"}, nil)
|
[]string{"status", "code"}, nil)
|
||||||
bytesReceivedDesc = prometheus.NewDesc(
|
bytesReceivedDesc = prometheus.NewDesc(
|
||||||
"tusd_bytes_received",
|
"tusd_bytes_received",
|
||||||
"Number of bytes received for uploads.",
|
"Number of bytes received for uploads.",
|
||||||
|
@ -79,8 +79,8 @@ func (c Collector) Collect(metrics chan<- prometheus.Metric) {
|
||||||
errorsTotalDesc,
|
errorsTotalDesc,
|
||||||
prometheus.CounterValue,
|
prometheus.CounterValue,
|
||||||
float64(atomic.LoadUint64(valuePtr)),
|
float64(atomic.LoadUint64(valuePtr)),
|
||||||
strconv.Itoa(httpError.StatusCode()),
|
strconv.Itoa(httpError.StatusCode),
|
||||||
httpError.Error(),
|
httpError.ErrorCode,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -719,7 +719,7 @@ func (upload s3Upload) GetReader(ctx context.Context) (io.Reader, error) {
|
||||||
})
|
})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
// The multipart upload still exists, which means we cannot download it yet
|
// The multipart upload still exists, which means we cannot download it yet
|
||||||
return nil, handler.NewHTTPError(errors.New("cannot stream non-finished upload"), http.StatusBadRequest)
|
return nil, handler.NewHTTPError("ERR_INCOMPLETE_UPLOAD", "cannot stream non-finished upload", http.StatusBadRequest)
|
||||||
}
|
}
|
||||||
|
|
||||||
if isAwsError(err, "NoSuchUpload") {
|
if isAwsError(err, "NoSuchUpload") {
|
||||||
|
|
Loading…
Reference in New Issue