diff --git a/pkg/filestore/filestore.go b/pkg/filestore/filestore.go index 131b90c..1f79279 100644 --- a/pkg/filestore/filestore.go +++ b/pkg/filestore/filestore.go @@ -163,16 +163,7 @@ func (upload *fileUpload) WriteChunk(ctx context.Context, offset int64, src io.R n, err := io.Copy(file, src) - // If the HTTP PATCH request gets interrupted in the middle (e.g. because - // the user wants to pause the upload), Go's net/http returns an io.ErrUnexpectedEOF. - // However, for FileStore it's not important whether the stream has ended - // on purpose or accidentally. - if err == io.ErrUnexpectedEOF { - err = nil - } - upload.info.Offset += n - return n, err } diff --git a/pkg/handler/body_reader.go b/pkg/handler/body_reader.go new file mode 100644 index 0000000..7374b89 --- /dev/null +++ b/pkg/handler/body_reader.go @@ -0,0 +1,53 @@ +package handler + +import ( + "io" + "sync/atomic" +) + +// bodyReader is an io.Reader, which is intended to wrap the request +// body reader. If an error occurr during reading the request body, it +// will not return this error to the reading entity, but instead store +// the error and close the io.Reader, so that the error can be checked +// afterwards. This is helpful, so that the stores do not have to handle +// the error but this can instead be done in the handler. +// In addition, the bodyReader keeps track of how many bytes were read. +type bodyReader struct { + reader io.Reader + err error + bytesCounter int64 +} + +func newBodyReader(r io.Reader) *bodyReader { + return &bodyReader{ + reader: r, + } +} + +func (r *bodyReader) Read(b []byte) (int, error) { + if r.err != nil { + return 0, io.EOF + } + + n, err := r.reader.Read(b) + atomic.AddInt64(&r.bytesCounter, int64(n)) + r.err = err + + if err == io.EOF { + return n, io.EOF + } else { + return n, nil + } +} + +func (r bodyReader) hasError() error { + if r.err == io.EOF { + return nil + } + + return r.err +} + +func (r *bodyReader) bytesRead() int64 { + return atomic.LoadInt64(&r.bytesCounter) +} diff --git a/pkg/handler/patch_test.go b/pkg/handler/patch_test.go index 79a3fe9..0938e7b 100644 --- a/pkg/handler/patch_test.go +++ b/pkg/handler/patch_test.go @@ -2,6 +2,7 @@ package handler_test import ( "context" + "errors" "io" "io/ioutil" "net/http" @@ -577,7 +578,7 @@ func TestPatch(t *testing.T) { Offset: 0, Size: 100, }, nil), - upload.EXPECT().WriteChunk(context.Background(), int64(0), NewReaderMatcher("first ")).Return(int64(6), http.ErrBodyReadAfterClose), + upload.EXPECT().WriteChunk(context.Background(), int64(0), NewReaderMatcher("first ")).Return(int64(6), nil), store.EXPECT().AsTerminatableUpload(upload).Return(upload), upload.EXPECT().Terminate(context.Background()), ) @@ -626,9 +627,58 @@ func TestPatch(t *testing.T) { ResHeader: map[string]string{ "Upload-Offset": "", }, + ResBody: "upload has been stopped by server\n", }).Run(handler, t) _, more := <-c a.False(more) }) + + SubTest(t, "BodyReadError", func(t *testing.T, store *MockFullDataStore, composer *StoreComposer) { + // This test ensure that error that occurr from reading the request body are not forwarded to the + // storage backend but are still causing an + ctrl := gomock.NewController(t) + defer ctrl.Finish() + upload := NewMockFullUpload(ctrl) + + gomock.InOrder( + store.EXPECT().GetUpload(context.Background(), "yes").Return(upload, nil), + upload.EXPECT().GetInfo(context.Background()).Return(FileInfo{ + ID: "yes", + Offset: 0, + Size: 100, + }, nil), + // The reader for WriteChunk must not return an error. + upload.EXPECT().WriteChunk(context.Background(), int64(0), NewReaderMatcher("first ")).Return(int64(6), nil), + ) + + handler, _ := NewHandler(Config{ + StoreComposer: composer, + }) + + reader, writer := io.Pipe() + a := assert.New(t) + + go func() { + writer.Write([]byte("first ")) + err := writer.CloseWithError(errors.New("an error while reading the body")) + a.NoError(err) + }() + + (&httpTest{ + Method: "PATCH", + URL: "yes", + ReqHeader: map[string]string{ + "Tus-Resumable": "1.0.0", + "Content-Type": "application/offset+octet-stream", + "Upload-Offset": "0", + }, + ReqBody: reader, + Code: http.StatusInternalServerError, + ResHeader: map[string]string{ + "Upload-Offset": "", + }, + ResBody: "an error while reading the body\n", + }).Run(handler, t) + }) } diff --git a/pkg/handler/unrouted_handler.go b/pkg/handler/unrouted_handler.go index 81f6c64..d9fadcd 100644 --- a/pkg/handler/unrouted_handler.go +++ b/pkg/handler/unrouted_handler.go @@ -12,7 +12,6 @@ import ( "regexp" "strconv" "strings" - "sync/atomic" "time" ) @@ -71,6 +70,9 @@ var ( ErrUploadLengthAndUploadDeferLength = NewHTTPError(errors.New("provided both Upload-Length and Upload-Defer-Length"), http.StatusBadRequest) ErrInvalidUploadDeferLength = NewHTTPError(errors.New("invalid Upload-Defer-Length header"), http.StatusBadRequest) ErrUploadStoppedByServer = NewHTTPError(errors.New("upload has been stopped by server"), http.StatusBadRequest) + + errReadTimeout = errors.New("read tcp: i/o timeout") + errConnectionReset = errors.New("read tcp: connection reset by peer") ) // HTTPRequest contains basic details of an incoming HTTP request. @@ -608,11 +610,12 @@ func (handler *UnroutedHandler) writeChunk(ctx context.Context, upload Upload, i handler.log("ChunkWriteStart", "id", id, "maxSize", i64toa(maxSize), "offset", i64toa(offset)) var bytesWritten int64 + var err error // Prevent a nil pointer dereference when accessing the body which may not be // available in the case of a malicious request. if r.Body != nil { // Limit the data read from the request's body to the allowed maximum - reader := io.LimitReader(r.Body, maxSize) + reader := newBodyReader(io.LimitReader(r.Body, maxSize)) // We use a context object to allow the hook system to cancel an upload uploadCtx, stopUpload := context.WithCancel(context.Background()) @@ -632,12 +635,10 @@ func (handler *UnroutedHandler) writeChunk(ctx context.Context, upload Upload, i }() if handler.config.NotifyUploadProgress { - var stopProgressEvents chan<- struct{} - reader, stopProgressEvents = handler.sendProgressMessages(newHookEvent(info, r), reader) + stopProgressEvents := handler.sendProgressMessages(newHookEvent(info, r), reader) defer close(stopProgressEvents) } - var err error bytesWritten, err = upload.WriteChunk(ctx, offset, reader) if terminateUpload && handler.composer.UsesTerminater { if terminateErr := handler.terminateUpload(ctx, upload, info, r); terminateErr != nil { @@ -647,20 +648,28 @@ func (handler *UnroutedHandler) writeChunk(ctx context.Context, upload Upload, i } } - // The error "http: invalid Read on closed Body" is returned if we stop the upload - // while the data store is still reading. Since this is an implementation detail, - // we replace this error with a message saying that the upload has been stopped. - if err == http.ErrBodyReadAfterClose { - err = ErrUploadStoppedByServer + // If we encountered an error while reading the body from the HTTP request, log it, but only include + // it in the response, if the store did not also return an error. + if bodyErr := reader.hasError(); bodyErr != nil { + handler.log("BodyReadError", "id", id, "error", bodyErr.Error()) + if err == nil { + err = bodyErr + } } - if err != nil { - return err + // If the upload was stopped by the server, send an error response indicating this. + // TODO: Include a custom reason for the end user why the upload was stopped. + if terminateUpload { + err = ErrUploadStoppedByServer } } handler.log("ChunkWriteComplete", "id", id, "bytesWritten", i64toa(bytesWritten)) + if err != nil { + return err + } + // Send new offset to client newOffset := offset + bytesWritten w.Header().Set("Upload-Offset", strconv.FormatInt(newOffset, 10)) @@ -900,16 +909,35 @@ func (handler *UnroutedHandler) sendError(w http.ResponseWriter, r *http.Request // message looks like: read tcp 127.0.0.1:1080->127.0.0.1:53673: i/o timeout // Therefore, we use a common error message for all of them. if netErr, ok := err.(net.Error); ok && netErr.Timeout() { - err = errors.New("read tcp: i/o timeout") + err = errReadTimeout } // Errors for connnection resets also contain TCP details, we don't need, e.g: // read tcp 127.0.0.1:1080->127.0.0.1:10023: read: connection reset by peer // Therefore, we also trim those down. if strings.HasSuffix(err.Error(), "read: connection reset by peer") { - err = errors.New("read tcp: connection reset by peer") + err = errConnectionReset } + // TODO: Decide if we should handle this in here, in body_reader or not at all. + // If the HTTP PATCH request gets interrupted in the middle (e.g. because + // the user wants to pause the upload), Go's net/http returns an io.ErrUnexpectedEOF. + // However, for the handler it's not important whether the stream has ended + // on purpose or accidentally. + //if err == io.ErrUnexpectedEOF { + // err = nil + //} + + // TODO: Decide if we want to ignore connection reset errors all together. + // In some cases, the HTTP connection gets reset by the other peer. This is not + // necessarily the tus client but can also be a proxy in front of tusd, e.g. HAProxy 2 + // is known to reset the connection to tusd, when the tus client closes the connection. + // To avoid erroring out in this case and loosing the uploaded data, we can ignore + // the error here without causing harm. + //if strings.Contains(err.Error(), "read: connection reset by peer") { + // err = nil + //} + statusErr, ok := err.(HTTPError) if !ok { statusErr = NewHTTPError(err, http.StatusInternalServerError) @@ -952,39 +980,26 @@ func (handler *UnroutedHandler) absFileURL(r *http.Request, id string) string { return url } -type progressWriter struct { - Offset int64 -} - -func (w *progressWriter) Write(b []byte) (int, error) { - atomic.AddInt64(&w.Offset, int64(len(b))) - return len(b), nil -} - // sendProgressMessage will send a notification over the UploadProgress channel // every second, indicating how much data has been transfered to the server. // It will stop sending these instances once the returned channel has been -// closed. The returned reader should be used to read the request body. -func (handler *UnroutedHandler) sendProgressMessages(hook HookEvent, reader io.Reader) (io.Reader, chan<- struct{}) { +// closed. +func (handler *UnroutedHandler) sendProgressMessages(hook HookEvent, reader *bodyReader) chan<- struct{} { previousOffset := int64(0) - progress := &progressWriter{ - Offset: hook.Upload.Offset, - } stop := make(chan struct{}, 1) - reader = io.TeeReader(reader, progress) go func() { for { select { case <-stop: - hook.Upload.Offset = atomic.LoadInt64(&progress.Offset) + hook.Upload.Offset = reader.bytesRead() if hook.Upload.Offset != previousOffset { handler.UploadProgress <- hook previousOffset = hook.Upload.Offset } return case <-time.After(1 * time.Second): - hook.Upload.Offset = atomic.LoadInt64(&progress.Offset) + hook.Upload.Offset = reader.bytesRead() if hook.Upload.Offset != previousOffset { handler.UploadProgress <- hook previousOffset = hook.Upload.Offset @@ -993,7 +1008,7 @@ func (handler *UnroutedHandler) sendProgressMessages(hook HookEvent, reader io.R } }() - return reader, stop + return stop } // getHostAndProtocol extracts the host and used protocol (either HTTP or HTTPS) diff --git a/pkg/s3store/s3store.go b/pkg/s3store/s3store.go index 3a37104..00d32a6 100644 --- a/pkg/s3store/s3store.go +++ b/pkg/s3store/s3store.go @@ -341,26 +341,6 @@ func (spp *s3PartProducer) nextPart(size int64) (*os.File, error) { limitedReader := io.LimitReader(spp.r, size) n, err := io.Copy(file, limitedReader) - - // If the HTTP PATCH request gets interrupted in the middle (e.g. because - // the user wants to pause the upload), Go's net/http returns an io.ErrUnexpectedEOF. - // However, for S3Store it's not important whether the stream has ended - // on purpose or accidentally. Therefore, we ignore this error to not - // prevent the remaining chunk to be stored on S3. - if err == io.ErrUnexpectedEOF { - err = nil - } - - // In some cases, the HTTP connection gets reset by the other peer. This is not - // necessarily the tus client but can also be a proxy in front of tusd, e.g. HAProxy 2 - // is known to reset the connection to tusd, when the tus client closes the connection. - // To avoid erroring out in this case and loosing the uploaded data, we can ignore - // the error here without causing harm. - // TODO: Move this into unrouted_handler.go, so other stores can also take advantage of this. - if err != nil && strings.Contains(err.Error(), "read: connection reset by peer") { - err = nil - } - if err != nil { return nil, err }