core: Add ability to stop upload from post-receive hook (#279)

* First implementation of stopping an upload from the server

* Remove unnecessary json tag

* Use golang.org/x/net/context for support in Go < 1.7
This commit is contained in:
Marius 2019-05-26 20:56:51 +01:00 committed by GitHub
parent 14faaafc67
commit d23be46d7a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 180 additions and 38 deletions

View File

@ -5,10 +5,10 @@ import (
"github.com/tus/tusd" "github.com/tus/tusd"
"github.com/tus/tusd/filestore" "github.com/tus/tusd/filestore"
"github.com/tus/tusd/gcsstore"
"github.com/tus/tusd/limitedstore" "github.com/tus/tusd/limitedstore"
"github.com/tus/tusd/memorylocker" "github.com/tus/tusd/memorylocker"
"github.com/tus/tusd/s3store" "github.com/tus/tusd/s3store"
"github.com/tus/tusd/gcsstore"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/aws/session"

View File

@ -6,26 +6,27 @@ import (
) )
var Flags struct { var Flags struct {
HttpHost string HttpHost string
HttpPort string HttpPort string
HttpSock string HttpSock string
MaxSize int64 MaxSize int64
UploadDir string UploadDir string
StoreSize int64 StoreSize int64
Basepath string Basepath string
Timeout int64 Timeout int64
S3Bucket string S3Bucket string
S3ObjectPrefix string S3ObjectPrefix string
S3Endpoint string S3Endpoint string
GCSBucket string GCSBucket string
FileHooksDir string FileHooksDir string
HttpHooksEndpoint string HttpHooksEndpoint string
HttpHooksRetry int HttpHooksRetry int
HttpHooksBackoff int HttpHooksBackoff int
ShowVersion bool HooksStopUploadCode int
ExposeMetrics bool ShowVersion bool
MetricsPath string ExposeMetrics bool
BehindProxy bool MetricsPath string
BehindProxy bool
FileHooksInstalled bool FileHooksInstalled bool
HttpHooksInstalled bool HttpHooksInstalled bool
@ -48,6 +49,7 @@ func ParseFlags() {
flag.StringVar(&Flags.HttpHooksEndpoint, "hooks-http", "", "An HTTP endpoint to which hook events will be sent to") flag.StringVar(&Flags.HttpHooksEndpoint, "hooks-http", "", "An HTTP endpoint to which hook events will be sent to")
flag.IntVar(&Flags.HttpHooksRetry, "hooks-http-retry", 3, "Number of times to retry on a 500 or network timeout") flag.IntVar(&Flags.HttpHooksRetry, "hooks-http-retry", 3, "Number of times to retry on a 500 or network timeout")
flag.IntVar(&Flags.HttpHooksBackoff, "hooks-http-backoff", 1, "Number of seconds to wait before retrying each retry") flag.IntVar(&Flags.HttpHooksBackoff, "hooks-http-backoff", 1, "Number of seconds to wait before retrying each retry")
flag.IntVar(&Flags.HooksStopUploadCode, "hooks-stop-code", 0, "Return code from post-receive hook which causes tusd to stop and delete the current upload. A zero value means that no uploads will be stopped")
flag.BoolVar(&Flags.ShowVersion, "version", false, "Print tusd version information") flag.BoolVar(&Flags.ShowVersion, "version", false, "Print tusd version information")
flag.BoolVar(&Flags.ExposeMetrics, "expose-metrics", true, "Expose metrics about tusd usage") flag.BoolVar(&Flags.ExposeMetrics, "expose-metrics", true, "Expose metrics about tusd usage")
flag.StringVar(&Flags.MetricsPath, "metrics-path", "/metrics", "Path under which the metrics endpoint will be accessible") flag.StringVar(&Flags.MetricsPath, "metrics-path", "/metrics", "Path under which the metrics endpoint will be accessible")

View File

@ -109,13 +109,14 @@ func invokeHookSync(typ HookType, info tusd.FileInfo, captureOutput bool) ([]byt
output := []byte{} output := []byte{}
err := error(nil) err := error(nil)
returnCode := 0
if Flags.FileHooksInstalled { if Flags.FileHooksInstalled {
output, err = invokeFileHook(name, typ, info, captureOutput) output, returnCode, err = invokeFileHook(name, typ, info, captureOutput)
} }
if Flags.HttpHooksInstalled { if Flags.HttpHooksInstalled {
output, err = invokeHttpHook(name, typ, info, captureOutput) output, returnCode, err = invokeHttpHook(name, typ, info, captureOutput)
} }
if err != nil { if err != nil {
@ -125,18 +126,24 @@ func invokeHookSync(typ HookType, info tusd.FileInfo, captureOutput bool) ([]byt
logEv(stdout, "HookInvocationFinish", "type", string(typ), "id", info.ID) logEv(stdout, "HookInvocationFinish", "type", string(typ), "id", info.ID)
} }
if typ == HookPostReceive && Flags.HooksStopUploadCode != 0 && Flags.HooksStopUploadCode == returnCode {
logEv(stdout, "HookStopUpload", "id", info.ID)
info.StopUpload()
}
return output, err return output, err
} }
func invokeHttpHook(name string, typ HookType, info tusd.FileInfo, captureOutput bool) ([]byte, error) { func invokeHttpHook(name string, typ HookType, info tusd.FileInfo, captureOutput bool) ([]byte, int, error) {
jsonInfo, err := json.Marshal(info) jsonInfo, err := json.Marshal(info)
if err != nil { if err != nil {
return nil, err return nil, 0, err
} }
req, err := http.NewRequest("POST", Flags.HttpHooksEndpoint, bytes.NewBuffer(jsonInfo)) req, err := http.NewRequest("POST", Flags.HttpHooksEndpoint, bytes.NewBuffer(jsonInfo))
if err != nil { if err != nil {
return nil, err return nil, 0, err
} }
req.Header.Set("Hook-Name", name) req.Header.Set("Hook-Name", name)
@ -152,27 +159,27 @@ func invokeHttpHook(name string, typ HookType, info tusd.FileInfo, captureOutput
resp, err := client.Do(req) resp, err := client.Do(req)
if err != nil { if err != nil {
return nil, err return nil, 0, err
} }
defer resp.Body.Close() defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body) body, err := ioutil.ReadAll(resp.Body)
if err != nil { if err != nil {
return nil, err return nil, 0, err
} }
if resp.StatusCode >= http.StatusBadRequest { if resp.StatusCode >= http.StatusBadRequest {
return body, hookError{fmt.Errorf("endpoint returned: %s", resp.Status), resp.StatusCode, body} return body, resp.StatusCode, hookError{fmt.Errorf("endpoint returned: %s", resp.Status), resp.StatusCode, body}
} }
if captureOutput { if captureOutput {
return body, err return body, resp.StatusCode, err
} }
return nil, err return nil, resp.StatusCode, err
} }
func invokeFileHook(name string, typ HookType, info tusd.FileInfo, captureOutput bool) ([]byte, error) { func invokeFileHook(name string, typ HookType, info tusd.FileInfo, captureOutput bool) ([]byte, int, error) {
hookPath := Flags.FileHooksDir + string(os.PathSeparator) + name hookPath := Flags.FileHooksDir + string(os.PathSeparator) + name
cmd := exec.Command(hookPath) cmd := exec.Command(hookPath)
env := os.Environ() env := os.Environ()
@ -182,7 +189,7 @@ func invokeFileHook(name string, typ HookType, info tusd.FileInfo, captureOutput
jsonInfo, err := json.Marshal(info) jsonInfo, err := json.Marshal(info)
if err != nil { if err != nil {
return nil, err return nil, 0, err
} }
reader := bytes.NewReader(jsonInfo) reader := bytes.NewReader(jsonInfo)
@ -208,5 +215,7 @@ func invokeFileHook(name string, typ HookType, info tusd.FileInfo, captureOutput
err = nil err = nil
} }
return output, err returnCode := cmd.ProcessState.ExitCode()
return output, returnCode, err
} }

View File

@ -2,6 +2,8 @@ package tusd
import ( import (
"io" "io"
"golang.org/x/net/context"
) )
type MetaData map[string]string type MetaData map[string]string
@ -25,6 +27,21 @@ type FileInfo struct {
// ordered slice containing the ids of the uploads of which the final upload // ordered slice containing the ids of the uploads of which the final upload
// will consist after concatenation. // will consist after concatenation.
PartialUploads []string PartialUploads []string
// stopUpload is the cancel function for the upload's context.Context. When
// invoked it will interrupt the writes to DataStore#WriteChunk.
stopUpload context.CancelFunc
}
// StopUpload interrupts an running upload from the server-side. This means that
// the current request body is closed, so that the data store does not get any
// more data. Furthermore, a response is sent to notify the client of the
// interrupting and the upload is terminated (if supported by the data store),
// so the upload cannot be resumed anymore.
func (f FileInfo) StopUpload() {
if f.stopUpload != nil {
f.stopUpload()
}
} }
type DataStore interface { type DataStore interface {

View File

@ -485,4 +485,64 @@ func TestPatch(t *testing.T) {
_, more := <-c _, more := <-c
a.False(more) a.False(more)
}) })
SubTest(t, "StopUpload", func(t *testing.T, store *MockFullDataStore) {
gomock.InOrder(
store.EXPECT().GetInfo("yes").Return(FileInfo{
ID: "yes",
Offset: 0,
Size: 100,
}, nil),
store.EXPECT().WriteChunk("yes", int64(0), NewReaderMatcher("first ")).Return(int64(6), http.ErrBodyReadAfterClose),
store.EXPECT().Terminate("yes").Return(nil),
)
handler, _ := NewHandler(Config{
DataStore: store,
NotifyUploadProgress: true,
})
c := make(chan FileInfo)
handler.UploadProgress = c
reader, writer := io.Pipe()
a := assert.New(t)
go func() {
writer.Write([]byte("first "))
info := <-c
info.StopUpload()
// Wait a short time to ensure that the goroutine in the PATCH
// handler has received and processed the stop event.
<-time.After(10 * time.Millisecond)
// Assert that the "request body" has been closed.
_, err := writer.Write([]byte("second "))
a.Equal(err, io.ErrClosedPipe)
// Close the upload progress handler so that the main goroutine
// can exit properly after waiting for this goroutine to finish.
close(handler.UploadProgress)
}()
(&httpTest{
Method: "PATCH",
URL: "yes",
ReqHeader: map[string]string{
"Tus-Resumable": "1.0.0",
"Content-Type": "application/offset+octet-stream",
"Upload-Offset": "0",
},
ReqBody: reader,
Code: http.StatusBadRequest,
ResHeader: map[string]string{
"Upload-Offset": "",
},
}).Run(handler, t)
_, more := <-c
a.False(more)
})
} }

View File

@ -14,6 +14,8 @@ import (
"strings" "strings"
"sync/atomic" "sync/atomic"
"time" "time"
"golang.org/x/net/context"
) )
const UploadLengthDeferred = "1" const UploadLengthDeferred = "1"
@ -70,6 +72,7 @@ var (
ErrModifyFinal = NewHTTPError(errors.New("modifying a final upload is not allowed"), http.StatusForbidden) ErrModifyFinal = NewHTTPError(errors.New("modifying a final upload is not allowed"), http.StatusForbidden)
ErrUploadLengthAndUploadDeferLength = NewHTTPError(errors.New("provided both Upload-Length and Upload-Defer-Length"), http.StatusBadRequest) ErrUploadLengthAndUploadDeferLength = NewHTTPError(errors.New("provided both Upload-Length and Upload-Defer-Length"), http.StatusBadRequest)
ErrInvalidUploadDeferLength = NewHTTPError(errors.New("invalid Upload-Defer-Length header"), http.StatusBadRequest) ErrInvalidUploadDeferLength = NewHTTPError(errors.New("invalid Upload-Defer-Length header"), http.StatusBadRequest)
ErrUploadStoppedByServer = NewHTTPError(errors.New("upload has been stopped by server"), http.StatusBadRequest)
) )
// UnroutedHandler exposes methods to handle requests as part of the tus protocol, // UnroutedHandler exposes methods to handle requests as part of the tus protocol,
@ -535,14 +538,46 @@ func (handler *UnroutedHandler) writeChunk(id string, info FileInfo, w http.Resp
// Limit the data read from the request's body to the allowed maximum // Limit the data read from the request's body to the allowed maximum
reader := io.LimitReader(r.Body, maxSize) reader := io.LimitReader(r.Body, maxSize)
// We use a context object to allow the hook system to cancel an upload
uploadCtx, stopUpload := context.WithCancel(context.Background())
info.stopUpload = stopUpload
// terminateUpload specifies whether the upload should be deleted after
// the write has finished
terminateUpload := false
// Cancel the context when the function exits to ensure that the goroutine
// is properly cleaned up
defer stopUpload()
go func() {
// Interrupt the Read() call from the request body
<-uploadCtx.Done()
terminateUpload = true
r.Body.Close()
}()
if handler.config.NotifyUploadProgress { if handler.config.NotifyUploadProgress {
var stop chan<- struct{} var stopProgressEvents chan<- struct{}
reader, stop = handler.sendProgressMessages(info, reader) reader, stopProgressEvents = handler.sendProgressMessages(info, reader)
defer close(stop) defer close(stopProgressEvents)
} }
var err error var err error
bytesWritten, err = handler.composer.Core.WriteChunk(id, offset, reader) bytesWritten, err = handler.composer.Core.WriteChunk(id, offset, reader)
if terminateUpload && handler.composer.UsesTerminater {
if terminateErr := handler.terminateUpload(id, info); terminateErr != nil {
// We only log this error and not show it to the user since this
// termination error is not relevant to the uploading client
handler.log("UploadStopTerminateError", "id", id, "error", terminateErr.Error())
}
}
// The error "http: invalid Read on closed Body" is returned if we stop the upload
// while the data store is still reading. Since this is an implementation detail,
// we replace this error with a message saying that the upload has been stopped.
if err == http.ErrBodyReadAfterClose {
err = ErrUploadStoppedByServer
}
if err != nil { if err != nil {
return err return err
} }
@ -735,19 +770,33 @@ func (handler *UnroutedHandler) DelFile(w http.ResponseWriter, r *http.Request)
} }
} }
err = handler.composer.Terminater.Terminate(id) err = handler.terminateUpload(id, info)
if err != nil { if err != nil {
handler.sendError(w, r, err) handler.sendError(w, r, err)
return return
} }
handler.sendResp(w, r, http.StatusNoContent) handler.sendResp(w, r, http.StatusNoContent)
}
// terminateUpload passes a given upload to the DataStore's Terminater,
// send the corresponding upload info on the TerminatedUploads channnel
// and updates the statistics.
// Note the the info argument is only needed if the terminated uploads
// notifications are enabled.
func (handler *UnroutedHandler) terminateUpload(id string, info FileInfo) error {
err := handler.composer.Terminater.Terminate(id)
if err != nil {
return err
}
if handler.config.NotifyTerminatedUploads { if handler.config.NotifyTerminatedUploads {
handler.TerminatedUploads <- info handler.TerminatedUploads <- info
} }
handler.Metrics.incUploadsTerminated() handler.Metrics.incUploadsTerminated()
return nil
} }
// Send the error in the response body. The status code will be looked up in // Send the error in the response body. The status code will be looked up in

View File

@ -99,6 +99,11 @@ func (m readerMatcher) Matches(x interface{}) bool {
} }
bytes, err := ioutil.ReadAll(input) bytes, err := ioutil.ReadAll(input)
// Handle closed pipes similar to how EOF are handled by ioutil.ReadAll,
// we handle this error as if the stream ended normally.
if err == io.ErrClosedPipe {
err = nil
}
if err != nil { if err != nil {
panic(err) panic(err)
} }