From 14faaafc6769448d4fb6837194cd4ea6dd3baa1e Mon Sep 17 00:00:00 2001 From: Marius Date: Sun, 26 May 2019 21:38:19 +0200 Subject: [PATCH 1/3] chore: Upgrade Docker image to Go 1.12 --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index ffd48e9..5789cad 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.7-alpine AS builder +FROM golang:1.12-alpine AS builder # Copy in the git repo from the build context COPY . /go/src/github.com/tus/tusd/ From d23be46d7a236714c451601f87fe6bbab7776d45 Mon Sep 17 00:00:00 2001 From: Marius Date: Sun, 26 May 2019 20:56:51 +0100 Subject: [PATCH 2/3] core: Add ability to stop upload from post-receive hook (#279) * First implementation of stopping an upload from the server * Remove unnecessary json tag * Use golang.org/x/net/context for support in Go < 1.7 --- cmd/tusd/cli/composer.go | 2 +- cmd/tusd/cli/flags.go | 42 ++++++++++++++-------------- cmd/tusd/cli/hooks.go | 35 ++++++++++++++--------- datastore.go | 17 ++++++++++++ patch_test.go | 60 ++++++++++++++++++++++++++++++++++++++++ unrouted_handler.go | 57 +++++++++++++++++++++++++++++++++++--- utils_test.go | 5 ++++ 7 files changed, 180 insertions(+), 38 deletions(-) diff --git a/cmd/tusd/cli/composer.go b/cmd/tusd/cli/composer.go index 2aad4d1..99eb962 100644 --- a/cmd/tusd/cli/composer.go +++ b/cmd/tusd/cli/composer.go @@ -5,10 +5,10 @@ import ( "github.com/tus/tusd" "github.com/tus/tusd/filestore" + "github.com/tus/tusd/gcsstore" "github.com/tus/tusd/limitedstore" "github.com/tus/tusd/memorylocker" "github.com/tus/tusd/s3store" - "github.com/tus/tusd/gcsstore" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" diff --git a/cmd/tusd/cli/flags.go b/cmd/tusd/cli/flags.go index b4e713a..5bb82de 100644 --- a/cmd/tusd/cli/flags.go +++ b/cmd/tusd/cli/flags.go @@ -6,26 +6,27 @@ import ( ) var Flags struct { - HttpHost string - HttpPort string - HttpSock string - MaxSize int64 - UploadDir string - StoreSize int64 - Basepath string - Timeout int64 - S3Bucket string - S3ObjectPrefix string - S3Endpoint string - GCSBucket string - FileHooksDir string - HttpHooksEndpoint string - HttpHooksRetry int - HttpHooksBackoff int - ShowVersion bool - ExposeMetrics bool - MetricsPath string - BehindProxy bool + HttpHost string + HttpPort string + HttpSock string + MaxSize int64 + UploadDir string + StoreSize int64 + Basepath string + Timeout int64 + S3Bucket string + S3ObjectPrefix string + S3Endpoint string + GCSBucket string + FileHooksDir string + HttpHooksEndpoint string + HttpHooksRetry int + HttpHooksBackoff int + HooksStopUploadCode int + ShowVersion bool + ExposeMetrics bool + MetricsPath string + BehindProxy bool FileHooksInstalled bool HttpHooksInstalled bool @@ -48,6 +49,7 @@ func ParseFlags() { flag.StringVar(&Flags.HttpHooksEndpoint, "hooks-http", "", "An HTTP endpoint to which hook events will be sent to") flag.IntVar(&Flags.HttpHooksRetry, "hooks-http-retry", 3, "Number of times to retry on a 500 or network timeout") flag.IntVar(&Flags.HttpHooksBackoff, "hooks-http-backoff", 1, "Number of seconds to wait before retrying each retry") + flag.IntVar(&Flags.HooksStopUploadCode, "hooks-stop-code", 0, "Return code from post-receive hook which causes tusd to stop and delete the current upload. A zero value means that no uploads will be stopped") flag.BoolVar(&Flags.ShowVersion, "version", false, "Print tusd version information") flag.BoolVar(&Flags.ExposeMetrics, "expose-metrics", true, "Expose metrics about tusd usage") flag.StringVar(&Flags.MetricsPath, "metrics-path", "/metrics", "Path under which the metrics endpoint will be accessible") diff --git a/cmd/tusd/cli/hooks.go b/cmd/tusd/cli/hooks.go index 77683d4..e00734f 100644 --- a/cmd/tusd/cli/hooks.go +++ b/cmd/tusd/cli/hooks.go @@ -109,13 +109,14 @@ func invokeHookSync(typ HookType, info tusd.FileInfo, captureOutput bool) ([]byt output := []byte{} err := error(nil) + returnCode := 0 if Flags.FileHooksInstalled { - output, err = invokeFileHook(name, typ, info, captureOutput) + output, returnCode, err = invokeFileHook(name, typ, info, captureOutput) } if Flags.HttpHooksInstalled { - output, err = invokeHttpHook(name, typ, info, captureOutput) + output, returnCode, err = invokeHttpHook(name, typ, info, captureOutput) } if err != nil { @@ -125,18 +126,24 @@ func invokeHookSync(typ HookType, info tusd.FileInfo, captureOutput bool) ([]byt logEv(stdout, "HookInvocationFinish", "type", string(typ), "id", info.ID) } + if typ == HookPostReceive && Flags.HooksStopUploadCode != 0 && Flags.HooksStopUploadCode == returnCode { + logEv(stdout, "HookStopUpload", "id", info.ID) + + info.StopUpload() + } + return output, err } -func invokeHttpHook(name string, typ HookType, info tusd.FileInfo, captureOutput bool) ([]byte, error) { +func invokeHttpHook(name string, typ HookType, info tusd.FileInfo, captureOutput bool) ([]byte, int, error) { jsonInfo, err := json.Marshal(info) if err != nil { - return nil, err + return nil, 0, err } req, err := http.NewRequest("POST", Flags.HttpHooksEndpoint, bytes.NewBuffer(jsonInfo)) if err != nil { - return nil, err + return nil, 0, err } req.Header.Set("Hook-Name", name) @@ -152,27 +159,27 @@ func invokeHttpHook(name string, typ HookType, info tusd.FileInfo, captureOutput resp, err := client.Do(req) if err != nil { - return nil, err + return nil, 0, err } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { - return nil, err + return nil, 0, err } if resp.StatusCode >= http.StatusBadRequest { - return body, hookError{fmt.Errorf("endpoint returned: %s", resp.Status), resp.StatusCode, body} + return body, resp.StatusCode, hookError{fmt.Errorf("endpoint returned: %s", resp.Status), resp.StatusCode, body} } if captureOutput { - return body, err + return body, resp.StatusCode, err } - return nil, err + return nil, resp.StatusCode, err } -func invokeFileHook(name string, typ HookType, info tusd.FileInfo, captureOutput bool) ([]byte, error) { +func invokeFileHook(name string, typ HookType, info tusd.FileInfo, captureOutput bool) ([]byte, int, error) { hookPath := Flags.FileHooksDir + string(os.PathSeparator) + name cmd := exec.Command(hookPath) env := os.Environ() @@ -182,7 +189,7 @@ func invokeFileHook(name string, typ HookType, info tusd.FileInfo, captureOutput jsonInfo, err := json.Marshal(info) if err != nil { - return nil, err + return nil, 0, err } reader := bytes.NewReader(jsonInfo) @@ -208,5 +215,7 @@ func invokeFileHook(name string, typ HookType, info tusd.FileInfo, captureOutput err = nil } - return output, err + returnCode := cmd.ProcessState.ExitCode() + + return output, returnCode, err } diff --git a/datastore.go b/datastore.go index c6e4249..2030b89 100644 --- a/datastore.go +++ b/datastore.go @@ -2,6 +2,8 @@ package tusd import ( "io" + + "golang.org/x/net/context" ) type MetaData map[string]string @@ -25,6 +27,21 @@ type FileInfo struct { // ordered slice containing the ids of the uploads of which the final upload // will consist after concatenation. PartialUploads []string + + // stopUpload is the cancel function for the upload's context.Context. When + // invoked it will interrupt the writes to DataStore#WriteChunk. + stopUpload context.CancelFunc +} + +// StopUpload interrupts an running upload from the server-side. This means that +// the current request body is closed, so that the data store does not get any +// more data. Furthermore, a response is sent to notify the client of the +// interrupting and the upload is terminated (if supported by the data store), +// so the upload cannot be resumed anymore. +func (f FileInfo) StopUpload() { + if f.stopUpload != nil { + f.stopUpload() + } } type DataStore interface { diff --git a/patch_test.go b/patch_test.go index 750c345..5fd3762 100644 --- a/patch_test.go +++ b/patch_test.go @@ -485,4 +485,64 @@ func TestPatch(t *testing.T) { _, more := <-c a.False(more) }) + + SubTest(t, "StopUpload", func(t *testing.T, store *MockFullDataStore) { + gomock.InOrder( + store.EXPECT().GetInfo("yes").Return(FileInfo{ + ID: "yes", + Offset: 0, + Size: 100, + }, nil), + store.EXPECT().WriteChunk("yes", int64(0), NewReaderMatcher("first ")).Return(int64(6), http.ErrBodyReadAfterClose), + store.EXPECT().Terminate("yes").Return(nil), + ) + + handler, _ := NewHandler(Config{ + DataStore: store, + NotifyUploadProgress: true, + }) + + c := make(chan FileInfo) + handler.UploadProgress = c + + reader, writer := io.Pipe() + a := assert.New(t) + + go func() { + writer.Write([]byte("first ")) + + info := <-c + info.StopUpload() + + // Wait a short time to ensure that the goroutine in the PATCH + // handler has received and processed the stop event. + <-time.After(10 * time.Millisecond) + + // Assert that the "request body" has been closed. + _, err := writer.Write([]byte("second ")) + a.Equal(err, io.ErrClosedPipe) + + // Close the upload progress handler so that the main goroutine + // can exit properly after waiting for this goroutine to finish. + close(handler.UploadProgress) + }() + + (&httpTest{ + Method: "PATCH", + URL: "yes", + ReqHeader: map[string]string{ + "Tus-Resumable": "1.0.0", + "Content-Type": "application/offset+octet-stream", + "Upload-Offset": "0", + }, + ReqBody: reader, + Code: http.StatusBadRequest, + ResHeader: map[string]string{ + "Upload-Offset": "", + }, + }).Run(handler, t) + + _, more := <-c + a.False(more) + }) } diff --git a/unrouted_handler.go b/unrouted_handler.go index 96c1b7c..1c39841 100644 --- a/unrouted_handler.go +++ b/unrouted_handler.go @@ -14,6 +14,8 @@ import ( "strings" "sync/atomic" "time" + + "golang.org/x/net/context" ) const UploadLengthDeferred = "1" @@ -70,6 +72,7 @@ var ( ErrModifyFinal = NewHTTPError(errors.New("modifying a final upload is not allowed"), http.StatusForbidden) ErrUploadLengthAndUploadDeferLength = NewHTTPError(errors.New("provided both Upload-Length and Upload-Defer-Length"), http.StatusBadRequest) ErrInvalidUploadDeferLength = NewHTTPError(errors.New("invalid Upload-Defer-Length header"), http.StatusBadRequest) + ErrUploadStoppedByServer = NewHTTPError(errors.New("upload has been stopped by server"), http.StatusBadRequest) ) // UnroutedHandler exposes methods to handle requests as part of the tus protocol, @@ -535,14 +538,46 @@ func (handler *UnroutedHandler) writeChunk(id string, info FileInfo, w http.Resp // Limit the data read from the request's body to the allowed maximum reader := io.LimitReader(r.Body, maxSize) + // We use a context object to allow the hook system to cancel an upload + uploadCtx, stopUpload := context.WithCancel(context.Background()) + info.stopUpload = stopUpload + // terminateUpload specifies whether the upload should be deleted after + // the write has finished + terminateUpload := false + // Cancel the context when the function exits to ensure that the goroutine + // is properly cleaned up + defer stopUpload() + + go func() { + // Interrupt the Read() call from the request body + <-uploadCtx.Done() + terminateUpload = true + r.Body.Close() + }() + if handler.config.NotifyUploadProgress { - var stop chan<- struct{} - reader, stop = handler.sendProgressMessages(info, reader) - defer close(stop) + var stopProgressEvents chan<- struct{} + reader, stopProgressEvents = handler.sendProgressMessages(info, reader) + defer close(stopProgressEvents) } var err error bytesWritten, err = handler.composer.Core.WriteChunk(id, offset, reader) + if terminateUpload && handler.composer.UsesTerminater { + if terminateErr := handler.terminateUpload(id, info); terminateErr != nil { + // We only log this error and not show it to the user since this + // termination error is not relevant to the uploading client + handler.log("UploadStopTerminateError", "id", id, "error", terminateErr.Error()) + } + } + + // The error "http: invalid Read on closed Body" is returned if we stop the upload + // while the data store is still reading. Since this is an implementation detail, + // we replace this error with a message saying that the upload has been stopped. + if err == http.ErrBodyReadAfterClose { + err = ErrUploadStoppedByServer + } + if err != nil { return err } @@ -735,19 +770,33 @@ func (handler *UnroutedHandler) DelFile(w http.ResponseWriter, r *http.Request) } } - err = handler.composer.Terminater.Terminate(id) + err = handler.terminateUpload(id, info) if err != nil { handler.sendError(w, r, err) return } handler.sendResp(w, r, http.StatusNoContent) +} + +// terminateUpload passes a given upload to the DataStore's Terminater, +// send the corresponding upload info on the TerminatedUploads channnel +// and updates the statistics. +// Note the the info argument is only needed if the terminated uploads +// notifications are enabled. +func (handler *UnroutedHandler) terminateUpload(id string, info FileInfo) error { + err := handler.composer.Terminater.Terminate(id) + if err != nil { + return err + } if handler.config.NotifyTerminatedUploads { handler.TerminatedUploads <- info } handler.Metrics.incUploadsTerminated() + + return nil } // Send the error in the response body. The status code will be looked up in diff --git a/utils_test.go b/utils_test.go index 550e87e..6c52820 100644 --- a/utils_test.go +++ b/utils_test.go @@ -99,6 +99,11 @@ func (m readerMatcher) Matches(x interface{}) bool { } bytes, err := ioutil.ReadAll(input) + // Handle closed pipes similar to how EOF are handled by ioutil.ReadAll, + // we handle this error as if the stream ended normally. + if err == io.ErrClosedPipe { + err = nil + } if err != nil { panic(err) } From b89c337b1bd1a97128978e68f9ee2ba328b53011 Mon Sep 17 00:00:00 2001 From: Marius Date: Sun, 26 May 2019 22:28:14 +0200 Subject: [PATCH 3/3] cli: Log full HTTP address for uploads --- cmd/tusd/cli/serve.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cmd/tusd/cli/serve.go b/cmd/tusd/cli/serve.go index a1e1754..9732b8d 100644 --- a/cmd/tusd/cli/serve.go +++ b/cmd/tusd/cli/serve.go @@ -74,6 +74,10 @@ func Serve() { stderr.Fatalf("Unable to create listener: %s", err) } + if Flags.HttpSock == "" { + stdout.Printf("You can now upload files to: http://%s%s", address, basepath) + } + if err = http.Serve(listener, nil); err != nil { stderr.Fatalf("Unable to serve: %s", err) }