diff --git a/cmd/tusd/cli/composer.go b/cmd/tusd/cli/composer.go index 2aad4d1..99eb962 100644 --- a/cmd/tusd/cli/composer.go +++ b/cmd/tusd/cli/composer.go @@ -5,10 +5,10 @@ import ( "github.com/tus/tusd" "github.com/tus/tusd/filestore" + "github.com/tus/tusd/gcsstore" "github.com/tus/tusd/limitedstore" "github.com/tus/tusd/memorylocker" "github.com/tus/tusd/s3store" - "github.com/tus/tusd/gcsstore" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" diff --git a/cmd/tusd/cli/flags.go b/cmd/tusd/cli/flags.go index b4e713a..5bb82de 100644 --- a/cmd/tusd/cli/flags.go +++ b/cmd/tusd/cli/flags.go @@ -6,26 +6,27 @@ import ( ) var Flags struct { - HttpHost string - HttpPort string - HttpSock string - MaxSize int64 - UploadDir string - StoreSize int64 - Basepath string - Timeout int64 - S3Bucket string - S3ObjectPrefix string - S3Endpoint string - GCSBucket string - FileHooksDir string - HttpHooksEndpoint string - HttpHooksRetry int - HttpHooksBackoff int - ShowVersion bool - ExposeMetrics bool - MetricsPath string - BehindProxy bool + HttpHost string + HttpPort string + HttpSock string + MaxSize int64 + UploadDir string + StoreSize int64 + Basepath string + Timeout int64 + S3Bucket string + S3ObjectPrefix string + S3Endpoint string + GCSBucket string + FileHooksDir string + HttpHooksEndpoint string + HttpHooksRetry int + HttpHooksBackoff int + HooksStopUploadCode int + ShowVersion bool + ExposeMetrics bool + MetricsPath string + BehindProxy bool FileHooksInstalled bool HttpHooksInstalled bool @@ -48,6 +49,7 @@ func ParseFlags() { flag.StringVar(&Flags.HttpHooksEndpoint, "hooks-http", "", "An HTTP endpoint to which hook events will be sent to") flag.IntVar(&Flags.HttpHooksRetry, "hooks-http-retry", 3, "Number of times to retry on a 500 or network timeout") flag.IntVar(&Flags.HttpHooksBackoff, "hooks-http-backoff", 1, "Number of seconds to wait before retrying each retry") + flag.IntVar(&Flags.HooksStopUploadCode, "hooks-stop-code", 0, "Return code from post-receive hook which causes tusd to stop and delete the current upload. A zero value means that no uploads will be stopped") flag.BoolVar(&Flags.ShowVersion, "version", false, "Print tusd version information") flag.BoolVar(&Flags.ExposeMetrics, "expose-metrics", true, "Expose metrics about tusd usage") flag.StringVar(&Flags.MetricsPath, "metrics-path", "/metrics", "Path under which the metrics endpoint will be accessible") diff --git a/cmd/tusd/cli/hooks.go b/cmd/tusd/cli/hooks.go index 77683d4..e00734f 100644 --- a/cmd/tusd/cli/hooks.go +++ b/cmd/tusd/cli/hooks.go @@ -109,13 +109,14 @@ func invokeHookSync(typ HookType, info tusd.FileInfo, captureOutput bool) ([]byt output := []byte{} err := error(nil) + returnCode := 0 if Flags.FileHooksInstalled { - output, err = invokeFileHook(name, typ, info, captureOutput) + output, returnCode, err = invokeFileHook(name, typ, info, captureOutput) } if Flags.HttpHooksInstalled { - output, err = invokeHttpHook(name, typ, info, captureOutput) + output, returnCode, err = invokeHttpHook(name, typ, info, captureOutput) } if err != nil { @@ -125,18 +126,24 @@ func invokeHookSync(typ HookType, info tusd.FileInfo, captureOutput bool) ([]byt logEv(stdout, "HookInvocationFinish", "type", string(typ), "id", info.ID) } + if typ == HookPostReceive && Flags.HooksStopUploadCode != 0 && Flags.HooksStopUploadCode == returnCode { + logEv(stdout, "HookStopUpload", "id", info.ID) + + info.StopUpload() + } + return output, err } -func invokeHttpHook(name string, typ HookType, info tusd.FileInfo, captureOutput bool) ([]byte, error) { +func invokeHttpHook(name string, typ HookType, info tusd.FileInfo, captureOutput bool) ([]byte, int, error) { jsonInfo, err := json.Marshal(info) if err != nil { - return nil, err + return nil, 0, err } req, err := http.NewRequest("POST", Flags.HttpHooksEndpoint, bytes.NewBuffer(jsonInfo)) if err != nil { - return nil, err + return nil, 0, err } req.Header.Set("Hook-Name", name) @@ -152,27 +159,27 @@ func invokeHttpHook(name string, typ HookType, info tusd.FileInfo, captureOutput resp, err := client.Do(req) if err != nil { - return nil, err + return nil, 0, err } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { - return nil, err + return nil, 0, err } if resp.StatusCode >= http.StatusBadRequest { - return body, hookError{fmt.Errorf("endpoint returned: %s", resp.Status), resp.StatusCode, body} + return body, resp.StatusCode, hookError{fmt.Errorf("endpoint returned: %s", resp.Status), resp.StatusCode, body} } if captureOutput { - return body, err + return body, resp.StatusCode, err } - return nil, err + return nil, resp.StatusCode, err } -func invokeFileHook(name string, typ HookType, info tusd.FileInfo, captureOutput bool) ([]byte, error) { +func invokeFileHook(name string, typ HookType, info tusd.FileInfo, captureOutput bool) ([]byte, int, error) { hookPath := Flags.FileHooksDir + string(os.PathSeparator) + name cmd := exec.Command(hookPath) env := os.Environ() @@ -182,7 +189,7 @@ func invokeFileHook(name string, typ HookType, info tusd.FileInfo, captureOutput jsonInfo, err := json.Marshal(info) if err != nil { - return nil, err + return nil, 0, err } reader := bytes.NewReader(jsonInfo) @@ -208,5 +215,7 @@ func invokeFileHook(name string, typ HookType, info tusd.FileInfo, captureOutput err = nil } - return output, err + returnCode := cmd.ProcessState.ExitCode() + + return output, returnCode, err } diff --git a/datastore.go b/datastore.go index c6e4249..2030b89 100644 --- a/datastore.go +++ b/datastore.go @@ -2,6 +2,8 @@ package tusd import ( "io" + + "golang.org/x/net/context" ) type MetaData map[string]string @@ -25,6 +27,21 @@ type FileInfo struct { // ordered slice containing the ids of the uploads of which the final upload // will consist after concatenation. PartialUploads []string + + // stopUpload is the cancel function for the upload's context.Context. When + // invoked it will interrupt the writes to DataStore#WriteChunk. + stopUpload context.CancelFunc +} + +// StopUpload interrupts an running upload from the server-side. This means that +// the current request body is closed, so that the data store does not get any +// more data. Furthermore, a response is sent to notify the client of the +// interrupting and the upload is terminated (if supported by the data store), +// so the upload cannot be resumed anymore. +func (f FileInfo) StopUpload() { + if f.stopUpload != nil { + f.stopUpload() + } } type DataStore interface { diff --git a/patch_test.go b/patch_test.go index 750c345..5fd3762 100644 --- a/patch_test.go +++ b/patch_test.go @@ -485,4 +485,64 @@ func TestPatch(t *testing.T) { _, more := <-c a.False(more) }) + + SubTest(t, "StopUpload", func(t *testing.T, store *MockFullDataStore) { + gomock.InOrder( + store.EXPECT().GetInfo("yes").Return(FileInfo{ + ID: "yes", + Offset: 0, + Size: 100, + }, nil), + store.EXPECT().WriteChunk("yes", int64(0), NewReaderMatcher("first ")).Return(int64(6), http.ErrBodyReadAfterClose), + store.EXPECT().Terminate("yes").Return(nil), + ) + + handler, _ := NewHandler(Config{ + DataStore: store, + NotifyUploadProgress: true, + }) + + c := make(chan FileInfo) + handler.UploadProgress = c + + reader, writer := io.Pipe() + a := assert.New(t) + + go func() { + writer.Write([]byte("first ")) + + info := <-c + info.StopUpload() + + // Wait a short time to ensure that the goroutine in the PATCH + // handler has received and processed the stop event. + <-time.After(10 * time.Millisecond) + + // Assert that the "request body" has been closed. + _, err := writer.Write([]byte("second ")) + a.Equal(err, io.ErrClosedPipe) + + // Close the upload progress handler so that the main goroutine + // can exit properly after waiting for this goroutine to finish. + close(handler.UploadProgress) + }() + + (&httpTest{ + Method: "PATCH", + URL: "yes", + ReqHeader: map[string]string{ + "Tus-Resumable": "1.0.0", + "Content-Type": "application/offset+octet-stream", + "Upload-Offset": "0", + }, + ReqBody: reader, + Code: http.StatusBadRequest, + ResHeader: map[string]string{ + "Upload-Offset": "", + }, + }).Run(handler, t) + + _, more := <-c + a.False(more) + }) } diff --git a/unrouted_handler.go b/unrouted_handler.go index 96c1b7c..1c39841 100644 --- a/unrouted_handler.go +++ b/unrouted_handler.go @@ -14,6 +14,8 @@ import ( "strings" "sync/atomic" "time" + + "golang.org/x/net/context" ) const UploadLengthDeferred = "1" @@ -70,6 +72,7 @@ var ( ErrModifyFinal = NewHTTPError(errors.New("modifying a final upload is not allowed"), http.StatusForbidden) ErrUploadLengthAndUploadDeferLength = NewHTTPError(errors.New("provided both Upload-Length and Upload-Defer-Length"), http.StatusBadRequest) ErrInvalidUploadDeferLength = NewHTTPError(errors.New("invalid Upload-Defer-Length header"), http.StatusBadRequest) + ErrUploadStoppedByServer = NewHTTPError(errors.New("upload has been stopped by server"), http.StatusBadRequest) ) // UnroutedHandler exposes methods to handle requests as part of the tus protocol, @@ -535,14 +538,46 @@ func (handler *UnroutedHandler) writeChunk(id string, info FileInfo, w http.Resp // Limit the data read from the request's body to the allowed maximum reader := io.LimitReader(r.Body, maxSize) + // We use a context object to allow the hook system to cancel an upload + uploadCtx, stopUpload := context.WithCancel(context.Background()) + info.stopUpload = stopUpload + // terminateUpload specifies whether the upload should be deleted after + // the write has finished + terminateUpload := false + // Cancel the context when the function exits to ensure that the goroutine + // is properly cleaned up + defer stopUpload() + + go func() { + // Interrupt the Read() call from the request body + <-uploadCtx.Done() + terminateUpload = true + r.Body.Close() + }() + if handler.config.NotifyUploadProgress { - var stop chan<- struct{} - reader, stop = handler.sendProgressMessages(info, reader) - defer close(stop) + var stopProgressEvents chan<- struct{} + reader, stopProgressEvents = handler.sendProgressMessages(info, reader) + defer close(stopProgressEvents) } var err error bytesWritten, err = handler.composer.Core.WriteChunk(id, offset, reader) + if terminateUpload && handler.composer.UsesTerminater { + if terminateErr := handler.terminateUpload(id, info); terminateErr != nil { + // We only log this error and not show it to the user since this + // termination error is not relevant to the uploading client + handler.log("UploadStopTerminateError", "id", id, "error", terminateErr.Error()) + } + } + + // The error "http: invalid Read on closed Body" is returned if we stop the upload + // while the data store is still reading. Since this is an implementation detail, + // we replace this error with a message saying that the upload has been stopped. + if err == http.ErrBodyReadAfterClose { + err = ErrUploadStoppedByServer + } + if err != nil { return err } @@ -735,19 +770,33 @@ func (handler *UnroutedHandler) DelFile(w http.ResponseWriter, r *http.Request) } } - err = handler.composer.Terminater.Terminate(id) + err = handler.terminateUpload(id, info) if err != nil { handler.sendError(w, r, err) return } handler.sendResp(w, r, http.StatusNoContent) +} + +// terminateUpload passes a given upload to the DataStore's Terminater, +// send the corresponding upload info on the TerminatedUploads channnel +// and updates the statistics. +// Note the the info argument is only needed if the terminated uploads +// notifications are enabled. +func (handler *UnroutedHandler) terminateUpload(id string, info FileInfo) error { + err := handler.composer.Terminater.Terminate(id) + if err != nil { + return err + } if handler.config.NotifyTerminatedUploads { handler.TerminatedUploads <- info } handler.Metrics.incUploadsTerminated() + + return nil } // Send the error in the response body. The status code will be looked up in diff --git a/utils_test.go b/utils_test.go index 550e87e..6c52820 100644 --- a/utils_test.go +++ b/utils_test.go @@ -99,6 +99,11 @@ func (m readerMatcher) Matches(x interface{}) bool { } bytes, err := ioutil.ReadAll(input) + // Handle closed pipes similar to how EOF are handled by ioutil.ReadAll, + // we handle this error as if the stream ended normally. + if err == io.ErrClosedPipe { + err = nil + } if err != nil { panic(err) }