core: Handle errors from reading request body centrally

This commit is contained in:
Marius 2021-04-26 10:08:37 +02:00
parent 97602c3d62
commit 6d987aa226
5 changed files with 151 additions and 62 deletions

View File

@ -163,16 +163,7 @@ func (upload *fileUpload) WriteChunk(ctx context.Context, offset int64, src io.R
n, err := io.Copy(file, src)
// If the HTTP PATCH request gets interrupted in the middle (e.g. because
// the user wants to pause the upload), Go's net/http returns an io.ErrUnexpectedEOF.
// However, for FileStore it's not important whether the stream has ended
// on purpose or accidentally.
if err == io.ErrUnexpectedEOF {
err = nil
}
upload.info.Offset += n
return n, err
}

View File

@ -0,0 +1,53 @@
package handler
import (
"io"
"sync/atomic"
)
// bodyReader is an io.Reader, which is intended to wrap the request
// body reader. If an error occurr during reading the request body, it
// will not return this error to the reading entity, but instead store
// the error and close the io.Reader, so that the error can be checked
// afterwards. This is helpful, so that the stores do not have to handle
// the error but this can instead be done in the handler.
// In addition, the bodyReader keeps track of how many bytes were read.
type bodyReader struct {
reader io.Reader
err error
bytesCounter int64
}
func newBodyReader(r io.Reader) *bodyReader {
return &bodyReader{
reader: r,
}
}
func (r *bodyReader) Read(b []byte) (int, error) {
if r.err != nil {
return 0, io.EOF
}
n, err := r.reader.Read(b)
atomic.AddInt64(&r.bytesCounter, int64(n))
r.err = err
if err == io.EOF {
return n, io.EOF
} else {
return n, nil
}
}
func (r bodyReader) hasError() error {
if r.err == io.EOF {
return nil
}
return r.err
}
func (r *bodyReader) bytesRead() int64 {
return atomic.LoadInt64(&r.bytesCounter)
}

View File

@ -2,6 +2,7 @@ package handler_test
import (
"context"
"errors"
"io"
"io/ioutil"
"net/http"
@ -577,7 +578,7 @@ func TestPatch(t *testing.T) {
Offset: 0,
Size: 100,
}, nil),
upload.EXPECT().WriteChunk(context.Background(), int64(0), NewReaderMatcher("first ")).Return(int64(6), http.ErrBodyReadAfterClose),
upload.EXPECT().WriteChunk(context.Background(), int64(0), NewReaderMatcher("first ")).Return(int64(6), nil),
store.EXPECT().AsTerminatableUpload(upload).Return(upload),
upload.EXPECT().Terminate(context.Background()),
)
@ -626,9 +627,58 @@ func TestPatch(t *testing.T) {
ResHeader: map[string]string{
"Upload-Offset": "",
},
ResBody: "upload has been stopped by server\n",
}).Run(handler, t)
_, more := <-c
a.False(more)
})
SubTest(t, "BodyReadError", func(t *testing.T, store *MockFullDataStore, composer *StoreComposer) {
// This test ensure that error that occurr from reading the request body are not forwarded to the
// storage backend but are still causing an
ctrl := gomock.NewController(t)
defer ctrl.Finish()
upload := NewMockFullUpload(ctrl)
gomock.InOrder(
store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil),
upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{
ID: "yes",
Offset: 0,
Size: 100,
}, nil),
// The reader for WriteChunk must not return an error.
upload.EXPECT().WriteChunk(context.Background(), int64(0), NewReaderMatcher("first ")).Return(int64(6), nil),
)
handler, _ := NewHandler(Config{
StoreComposer: composer,
})
reader, writer := io.Pipe()
a := assert.New(t)
go func() {
writer.Write([]byte("first "))
err := writer.CloseWithError(errors.New("an error while reading the body"))
a.NoError(err)
}()
(&httpTest{
Method: "PATCH",
URL: "yes",
ReqHeader: map[string]string{
"Tus-Resumable": "1.0.0",
"Content-Type": "application/offset+octet-stream",
"Upload-Offset": "0",
},
ReqBody: reader,
Code: http.StatusInternalServerError,
ResHeader: map[string]string{
"Upload-Offset": "",
},
ResBody: "an error while reading the body\n",
}).Run(handler, t)
})
}

View File

@ -12,7 +12,6 @@ import (
"regexp"
"strconv"
"strings"
"sync/atomic"
"time"
)
@ -71,6 +70,9 @@ var (
ErrUploadLengthAndUploadDeferLength = NewHTTPError(errors.New("provided both Upload-Length and Upload-Defer-Length"), http.StatusBadRequest)
ErrInvalidUploadDeferLength = NewHTTPError(errors.New("invalid Upload-Defer-Length header"), http.StatusBadRequest)
ErrUploadStoppedByServer = NewHTTPError(errors.New("upload has been stopped by server"), http.StatusBadRequest)
errReadTimeout = errors.New("read tcp: i/o timeout")
errConnectionReset = errors.New("read tcp: connection reset by peer")
)
// HTTPRequest contains basic details of an incoming HTTP request.
@ -608,11 +610,12 @@ func (handler *UnroutedHandler) writeChunk(ctx context.Context, upload Upload, i
handler.log("ChunkWriteStart", "id", id, "maxSize", i64toa(maxSize), "offset", i64toa(offset))
var bytesWritten int64
var err error
// Prevent a nil pointer dereference when accessing the body which may not be
// available in the case of a malicious request.
if r.Body != nil {
// Limit the data read from the request's body to the allowed maximum
reader := io.LimitReader(r.Body, maxSize)
reader := newBodyReader(io.LimitReader(r.Body, maxSize))
// We use a context object to allow the hook system to cancel an upload
uploadCtx, stopUpload := context.WithCancel(context.Background())
@ -632,12 +635,10 @@ func (handler *UnroutedHandler) writeChunk(ctx context.Context, upload Upload, i
}()
if handler.config.NotifyUploadProgress {
var stopProgressEvents chan<- struct{}
reader, stopProgressEvents = handler.sendProgressMessages(newHookEvent(info, r), reader)
stopProgressEvents := handler.sendProgressMessages(newHookEvent(info, r), reader)
defer close(stopProgressEvents)
}
var err error
bytesWritten, err = upload.WriteChunk(ctx, offset, reader)
if terminateUpload && handler.composer.UsesTerminater {
if terminateErr := handler.terminateUpload(ctx, upload, info, r); terminateErr != nil {
@ -647,20 +648,28 @@ func (handler *UnroutedHandler) writeChunk(ctx context.Context, upload Upload, i
}
}
// The error "http: invalid Read on closed Body" is returned if we stop the upload
// while the data store is still reading. Since this is an implementation detail,
// we replace this error with a message saying that the upload has been stopped.
if err == http.ErrBodyReadAfterClose {
err = ErrUploadStoppedByServer
// If we encountered an error while reading the body from the HTTP request, log it, but only include
// it in the response, if the store did not also return an error.
if bodyErr := reader.hasError(); bodyErr != nil {
handler.log("BodyReadError", "id", id, "error", bodyErr.Error())
if err == nil {
err = bodyErr
}
}
if err != nil {
return err
// If the upload was stopped by the server, send an error response indicating this.
// TODO: Include a custom reason for the end user why the upload was stopped.
if terminateUpload {
err = ErrUploadStoppedByServer
}
}
handler.log("ChunkWriteComplete", "id", id, "bytesWritten", i64toa(bytesWritten))
if err != nil {
return err
}
// Send new offset to client
newOffset := offset + bytesWritten
w.Header().Set("Upload-Offset", strconv.FormatInt(newOffset, 10))
@ -900,16 +909,35 @@ func (handler *UnroutedHandler) sendError(w http.ResponseWriter, r *http.Request
// message looks like: read tcp 127.0.0.1:1080->127.0.0.1:53673: i/o timeout
// Therefore, we use a common error message for all of them.
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
err = errors.New("read tcp: i/o timeout")
err = errReadTimeout
}
// Errors for connnection resets also contain TCP details, we don't need, e.g:
// read tcp 127.0.0.1:1080->127.0.0.1:10023: read: connection reset by peer
// Therefore, we also trim those down.
if strings.HasSuffix(err.Error(), "read: connection reset by peer") {
err = errors.New("read tcp: connection reset by peer")
err = errConnectionReset
}
// TODO: Decide if we should handle this in here, in body_reader or not at all.
// If the HTTP PATCH request gets interrupted in the middle (e.g. because
// the user wants to pause the upload), Go's net/http returns an io.ErrUnexpectedEOF.
// However, for the handler it's not important whether the stream has ended
// on purpose or accidentally.
//if err == io.ErrUnexpectedEOF {
// err = nil
//}
// TODO: Decide if we want to ignore connection reset errors all together.
// In some cases, the HTTP connection gets reset by the other peer. This is not
// necessarily the tus client but can also be a proxy in front of tusd, e.g. HAProxy 2
// is known to reset the connection to tusd, when the tus client closes the connection.
// To avoid erroring out in this case and loosing the uploaded data, we can ignore
// the error here without causing harm.
//if strings.Contains(err.Error(), "read: connection reset by peer") {
// err = nil
//}
statusErr, ok := err.(HTTPError)
if !ok {
statusErr = NewHTTPError(err, http.StatusInternalServerError)
@ -952,39 +980,26 @@ func (handler *UnroutedHandler) absFileURL(r *http.Request, id string) string {
return url
}
type progressWriter struct {
Offset int64
}
func (w *progressWriter) Write(b []byte) (int, error) {
atomic.AddInt64(&w.Offset, int64(len(b)))
return len(b), nil
}
// sendProgressMessage will send a notification over the UploadProgress channel
// every second, indicating how much data has been transfered to the server.
// It will stop sending these instances once the returned channel has been
// closed. The returned reader should be used to read the request body.
func (handler *UnroutedHandler) sendProgressMessages(hook HookEvent, reader io.Reader) (io.Reader, chan<- struct{}) {
// closed.
func (handler *UnroutedHandler) sendProgressMessages(hook HookEvent, reader *bodyReader) chan<- struct{} {
previousOffset := int64(0)
progress := &progressWriter{
Offset: hook.Upload.Offset,
}
stop := make(chan struct{}, 1)
reader = io.TeeReader(reader, progress)
go func() {
for {
select {
case <-stop:
hook.Upload.Offset = atomic.LoadInt64(&progress.Offset)
hook.Upload.Offset = reader.bytesRead()
if hook.Upload.Offset != previousOffset {
handler.UploadProgress <- hook
previousOffset = hook.Upload.Offset
}
return
case <-time.After(1 * time.Second):
hook.Upload.Offset = atomic.LoadInt64(&progress.Offset)
hook.Upload.Offset = reader.bytesRead()
if hook.Upload.Offset != previousOffset {
handler.UploadProgress <- hook
previousOffset = hook.Upload.Offset
@ -993,7 +1008,7 @@ func (handler *UnroutedHandler) sendProgressMessages(hook HookEvent, reader io.R
}
}()
return reader, stop
return stop
}
// getHostAndProtocol extracts the host and used protocol (either HTTP or HTTPS)

View File

@ -341,26 +341,6 @@ func (spp *s3PartProducer) nextPart(size int64) (*os.File, error) {
limitedReader := io.LimitReader(spp.r, size)
n, err := io.Copy(file, limitedReader)
// If the HTTP PATCH request gets interrupted in the middle (e.g. because
// the user wants to pause the upload), Go's net/http returns an io.ErrUnexpectedEOF.
// However, for S3Store it's not important whether the stream has ended
// on purpose or accidentally. Therefore, we ignore this error to not
// prevent the remaining chunk to be stored on S3.
if err == io.ErrUnexpectedEOF {
err = nil
}
// In some cases, the HTTP connection gets reset by the other peer. This is not
// necessarily the tus client but can also be a proxy in front of tusd, e.g. HAProxy 2
// is known to reset the connection to tusd, when the tus client closes the connection.
// To avoid erroring out in this case and loosing the uploaded data, we can ignore
// the error here without causing harm.
// TODO: Move this into unrouted_handler.go, so other stores can also take advantage of this.
if err != nil && strings.Contains(err.Error(), "read: connection reset by peer") {
err = nil
}
if err != nil {
return nil, err
}