core: Provide HTTP request details to hooks

Closes https://github.com/tus/tusd/issues/185
This commit is contained in:
Marius 2019-09-19 11:15:48 +02:00
parent d2be5e82bd
commit 6b21772107
13 changed files with 163 additions and 89 deletions

View File

@ -1,7 +1,6 @@
package cli package cli
import ( import (
"context"
"fmt" "fmt"
"strconv" "strconv"
@ -20,22 +19,19 @@ func hookTypeInSlice(a hooks.HookType, list []hooks.HookType) bool {
return false return false
} }
type hookDataStore struct { func preCreateCallback(info handler.HookEvent) error {
handler.DataStore
}
func (store hookDataStore) NewUpload(ctx context.Context, info handler.FileInfo) (handler.Upload, error) {
if output, err := invokeHookSync(hooks.HookPreCreate, info, true); err != nil { if output, err := invokeHookSync(hooks.HookPreCreate, info, true); err != nil {
if hookErr, ok := err.(hooks.HookError); ok { if hookErr, ok := err.(hooks.HookError); ok {
return nil, hooks.NewHookError( return hooks.NewHookError(
fmt.Errorf("pre-create hook failed: %s", err), fmt.Errorf("pre-create hook failed: %s", err),
hookErr.StatusCode(), hookErr.StatusCode(),
hookErr.Body(), hookErr.Body(),
) )
} }
return nil, fmt.Errorf("pre-create hook failed: %s\n%s", err, string(output)) return fmt.Errorf("pre-create hook failed: %s\n%s", err, string(output))
} }
return store.DataStore.NewUpload(ctx, info)
return nil
} }
func SetupHookMetrics() { func SetupHookMetrics() {
@ -46,7 +42,7 @@ func SetupHookMetrics() {
MetricsHookErrorsTotal.WithLabelValues(string(hooks.HookPreCreate)).Add(0) MetricsHookErrorsTotal.WithLabelValues(string(hooks.HookPreCreate)).Add(0)
} }
func SetupPreHooks(composer *handler.StoreComposer) error { func SetupPreHooks(config *handler.Config) error {
if Flags.FileHooksDir != "" { if Flags.FileHooksDir != "" {
hookHandler = &hooks.FileHook{ hookHandler = &hooks.FileHook{
Directory: Flags.FileHooksDir, Directory: Flags.FileHooksDir,
@ -69,9 +65,8 @@ func SetupPreHooks(composer *handler.StoreComposer) error {
return err return err
} }
composer.UseCore(hookDataStore{ config.PreUploadCreateCallback = preCreateCallback
DataStore: composer.Core,
})
return nil return nil
} }
@ -92,23 +87,26 @@ func SetupPostHooks(handler *handler.Handler) {
}() }()
} }
func invokeHookAsync(typ hooks.HookType, info handler.FileInfo) { func invokeHookAsync(typ hooks.HookType, info handler.HookEvent) {
go func() { go func() {
// Error handling is taken care by the function. // Error handling is taken care by the function.
_, _ = invokeHookSync(typ, info, false) _, _ = invokeHookSync(typ, info, false)
}() }()
} }
func invokeHookSync(typ hooks.HookType, info handler.FileInfo, captureOutput bool) ([]byte, error) { func invokeHookSync(typ hooks.HookType, info handler.HookEvent, captureOutput bool) ([]byte, error) {
if !hookTypeInSlice(typ, Flags.EnabledHooks) { if !hookTypeInSlice(typ, Flags.EnabledHooks) {
return nil, nil return nil, nil
} }
id := info.Upload.ID
size := info.Upload.Size
switch typ { switch typ {
case hooks.HookPostFinish: case hooks.HookPostFinish:
logEv(stdout, "UploadFinished", "id", info.ID, "size", strconv.FormatInt(info.Size, 10)) logEv(stdout, "UploadFinished", "id", id, "size", strconv.FormatInt(size, 10))
case hooks.HookPostTerminate: case hooks.HookPostTerminate:
logEv(stdout, "UploadTerminated", "id", info.ID) logEv(stdout, "UploadTerminated", "id", id)
} }
if hookHandler == nil { if hookHandler == nil {
@ -117,22 +115,22 @@ func invokeHookSync(typ hooks.HookType, info handler.FileInfo, captureOutput boo
name := string(typ) name := string(typ)
if Flags.VerboseOutput { if Flags.VerboseOutput {
logEv(stdout, "HookInvocationStart", "type", name, "id", info.ID) logEv(stdout, "HookInvocationStart", "type", name, "id", id)
} }
output, returnCode, err := hookHandler.InvokeHook(typ, info, captureOutput) output, returnCode, err := hookHandler.InvokeHook(typ, info, captureOutput)
if err != nil { if err != nil {
logEv(stderr, "HookInvocationError", "type", string(typ), "id", info.ID, "error", err.Error()) logEv(stderr, "HookInvocationError", "type", string(typ), "id", id, "error", err.Error())
MetricsHookErrorsTotal.WithLabelValues(string(typ)).Add(1) MetricsHookErrorsTotal.WithLabelValues(string(typ)).Add(1)
} else if Flags.VerboseOutput { } else if Flags.VerboseOutput {
logEv(stdout, "HookInvocationFinish", "type", string(typ), "id", info.ID) logEv(stdout, "HookInvocationFinish", "type", string(typ), "id", id)
} }
if typ == hooks.HookPostReceive && Flags.HooksStopUploadCode != 0 && Flags.HooksStopUploadCode == returnCode { if typ == hooks.HookPostReceive && Flags.HooksStopUploadCode != 0 && Flags.HooksStopUploadCode == returnCode {
logEv(stdout, "HookStopUpload", "id", info.ID) logEv(stdout, "HookStopUpload", "id", id)
info.StopUpload() info.Upload.StopUpload()
} }
return output, err return output, err

View File

@ -18,13 +18,13 @@ func (_ FileHook) Setup() error {
return nil return nil
} }
func (h FileHook) InvokeHook(typ HookType, info handler.FileInfo, captureOutput bool) ([]byte, int, error) { func (h FileHook) InvokeHook(typ HookType, info handler.HookEvent, captureOutput bool) ([]byte, int, error) {
hookPath := h.Directory + string(os.PathSeparator) + string(typ) hookPath := h.Directory + string(os.PathSeparator) + string(typ)
cmd := exec.Command(hookPath) cmd := exec.Command(hookPath)
env := os.Environ() env := os.Environ()
env = append(env, "TUS_ID="+info.ID) env = append(env, "TUS_ID="+info.Upload.ID)
env = append(env, "TUS_SIZE="+strconv.FormatInt(info.Size, 10)) env = append(env, "TUS_SIZE="+strconv.FormatInt(info.Upload.Size, 10))
env = append(env, "TUS_OFFSET="+strconv.FormatInt(info.Offset, 10)) env = append(env, "TUS_OFFSET="+strconv.FormatInt(info.Upload.Offset, 10))
jsonInfo, err := json.Marshal(info) jsonInfo, err := json.Marshal(info)
if err != nil { if err != nil {

View File

@ -6,7 +6,7 @@ import (
type HookHandler interface { type HookHandler interface {
Setup() error Setup() error
InvokeHook(typ HookType, info handler.FileInfo, captureOutput bool) ([]byte, int, error) InvokeHook(typ HookType, info handler.HookEvent, captureOutput bool) ([]byte, int, error)
} }
type HookType string type HookType string

View File

@ -23,7 +23,7 @@ func (_ HttpHook) Setup() error {
return nil return nil
} }
func (h HttpHook) InvokeHook(typ HookType, info handler.FileInfo, captureOutput bool) ([]byte, int, error) { func (h HttpHook) InvokeHook(typ HookType, info handler.HookEvent, captureOutput bool) ([]byte, int, error) {
jsonInfo, err := json.Marshal(info) jsonInfo, err := json.Marshal(info)
if err != nil { if err != nil {
return nil, 0, err return nil, 0, err

View File

@ -8,11 +8,11 @@ import (
) )
type PluginHookHandler interface { type PluginHookHandler interface {
PreCreate(info handler.FileInfo) error PreCreate(info handler.HookEvent) error
PostCreate(info handler.FileInfo) error PostCreate(info handler.HookEvent) error
PostReceive(info handler.FileInfo) error PostReceive(info handler.HookEvent) error
PostFinish(info handler.FileInfo) error PostFinish(info handler.HookEvent) error
PostTerminate(info handler.FileInfo) error PostTerminate(info handler.HookEvent) error
} }
type PluginHook struct { type PluginHook struct {
@ -41,7 +41,7 @@ func (h *PluginHook) Setup() error {
return nil return nil
} }
func (h PluginHook) InvokeHook(typ HookType, info handler.FileInfo, captureOutput bool) ([]byte, int, error) { func (h PluginHook) InvokeHook(typ HookType, info handler.HookEvent, captureOutput bool) ([]byte, int, error) {
var err error var err error
switch typ { switch typ {
case HookPostFinish: case HookPostFinish:

View File

@ -16,11 +16,7 @@ import (
// specified, in which case a different socket creation and binding mechanism // specified, in which case a different socket creation and binding mechanism
// is put in place. // is put in place.
func Serve() { func Serve() {
if err := SetupPreHooks(Composer); err != nil { config := handler.Config{
stderr.Fatalf("Unable to setup hooks for handler: %s", err)
}
handler, err := handler.NewHandler(handler.Config{
MaxSize: Flags.MaxSize, MaxSize: Flags.MaxSize,
BasePath: Flags.Basepath, BasePath: Flags.Basepath,
RespectForwardedHeaders: Flags.BehindProxy, RespectForwardedHeaders: Flags.BehindProxy,
@ -29,7 +25,13 @@ func Serve() {
NotifyTerminatedUploads: true, NotifyTerminatedUploads: true,
NotifyUploadProgress: true, NotifyUploadProgress: true,
NotifyCreatedUploads: true, NotifyCreatedUploads: true,
}) }
if err := SetupPreHooks(&config); err != nil {
stderr.Fatalf("Unable to setup hooks for handler: %s", err)
}
handler, err := handler.NewHandler(config)
if err != nil { if err != nil {
stderr.Fatalf("Unable to create handler: %s", err) stderr.Fatalf("Unable to create handler: %s", err)
} }

View File

@ -150,7 +150,7 @@ func TestConcat(t *testing.T) {
NotifyCompleteUploads: true, NotifyCompleteUploads: true,
}) })
c := make(chan FileInfo, 1) c := make(chan HookEvent, 1)
handler.CompleteUploads = c handler.CompleteUploads = c
(&httpTest{ (&httpTest{
@ -160,18 +160,25 @@ func TestConcat(t *testing.T) {
// A space between `final;` and the first URL should be allowed due to // A space between `final;` and the first URL should be allowed due to
// compatibility reasons, even if the specification does not define // compatibility reasons, even if the specification does not define
// it. Therefore this character is included in this test case. // it. Therefore this character is included in this test case.
"Upload-Concat": "final; http://tus.io/files/a /files/b/", "Upload-Concat": "final; http://tus.io/files/a /files/b/",
"X-Custom-Header": "tada",
}, },
Code: http.StatusCreated, Code: http.StatusCreated,
}).Run(handler, t) }).Run(handler, t)
info := <-c event := <-c
info := event.Upload
a.Equal("foo", info.ID) a.Equal("foo", info.ID)
a.EqualValues(10, info.Size) a.EqualValues(10, info.Size)
a.EqualValues(10, info.Offset) a.EqualValues(10, info.Offset)
a.False(info.IsPartial) a.False(info.IsPartial)
a.True(info.IsFinal) a.True(info.IsFinal)
a.Equal([]string{"a", "b"}, info.PartialUploads) a.Equal([]string{"a", "b"}, info.PartialUploads)
req := event.HTTPRequest
a.Equal("POST", req.Method)
a.Equal("", req.URI)
a.Equal("tada", req.Header.Get("X-Custom-Header"))
}) })
SubTest(t, "Status", func(t *testing.T, store *MockFullDataStore, composer *StoreComposer) { SubTest(t, "Status", func(t *testing.T, store *MockFullDataStore, composer *StoreComposer) {

View File

@ -40,6 +40,11 @@ type Config struct {
// potentially set by proxies when generating an absolute URL in the // potentially set by proxies when generating an absolute URL in the
// response to POST requests. // response to POST requests.
RespectForwardedHeaders bool RespectForwardedHeaders bool
// PreUploadreateCCallback will be invoked before a new upload is created, if the
// property is supplied. If the callback returns nil, the upload will be created.
// Otherwise the HTTP request will be aborted. This can be used to implement
// validation of upload metadata etc.
PreUploadCreateCallback func(hook HookEvent) error
} }
func (config *Config) validate() error { func (config *Config) validate() error {

View File

@ -38,7 +38,7 @@ func TestPatch(t *testing.T) {
NotifyCompleteUploads: true, NotifyCompleteUploads: true,
}) })
c := make(chan FileInfo, 1) c := make(chan HookEvent, 1)
handler.CompleteUploads = c handler.CompleteUploads = c
(&httpTest{ (&httpTest{
@ -57,10 +57,16 @@ func TestPatch(t *testing.T) {
}).Run(handler, t) }).Run(handler, t)
a := assert.New(t) a := assert.New(t)
info := <-c event := <-c
info := event.Upload
a.Equal("yes", info.ID) a.Equal("yes", info.ID)
a.EqualValues(int64(10), info.Size) a.EqualValues(int64(10), info.Size)
a.Equal(int64(10), info.Offset) a.Equal(int64(10), info.Offset)
req := event.HTTPRequest
a.Equal("PATCH", req.Method)
a.Equal("yes", req.URI)
a.Equal("5", req.Header.Get("Upload-Offset"))
}) })
SubTest(t, "MethodOverriding", func(t *testing.T, store *MockFullDataStore, composer *StoreComposer) { SubTest(t, "MethodOverriding", func(t *testing.T, store *MockFullDataStore, composer *StoreComposer) {
@ -506,7 +512,7 @@ func TestPatch(t *testing.T) {
NotifyUploadProgress: true, NotifyUploadProgress: true,
}) })
c := make(chan FileInfo) c := make(chan HookEvent)
handler.UploadProgress = c handler.UploadProgress = c
reader, writer := io.Pipe() reader, writer := io.Pipe()
@ -514,8 +520,9 @@ func TestPatch(t *testing.T) {
go func() { go func() {
writer.Write([]byte("first ")) writer.Write([]byte("first "))
event := <-c
info := <-c info := event.Upload
a.Equal("yes", info.ID) a.Equal("yes", info.ID)
a.Equal(int64(100), info.Size) a.Equal(int64(100), info.Size)
a.Equal(int64(6), info.Offset) a.Equal(int64(6), info.Offset)
@ -523,7 +530,8 @@ func TestPatch(t *testing.T) {
writer.Write([]byte("second ")) writer.Write([]byte("second "))
writer.Write([]byte("third")) writer.Write([]byte("third"))
info = <-c event = <-c
info = event.Upload
a.Equal("yes", info.ID) a.Equal("yes", info.ID)
a.Equal(int64(100), info.Size) a.Equal(int64(100), info.Size)
a.Equal(int64(18), info.Offset) a.Equal(int64(18), info.Offset)
@ -580,7 +588,7 @@ func TestPatch(t *testing.T) {
NotifyUploadProgress: true, NotifyUploadProgress: true,
}) })
c := make(chan FileInfo) c := make(chan HookEvent)
handler.UploadProgress = c handler.UploadProgress = c
reader, writer := io.Pipe() reader, writer := io.Pipe()
@ -589,7 +597,8 @@ func TestPatch(t *testing.T) {
go func() { go func() {
writer.Write([]byte("first ")) writer.Write([]byte("first "))
info := <-c event := <-c
info := event.Upload
info.StopUpload() info.StopUpload()
// Wait a short time to ensure that the goroutine in the PATCH // Wait a short time to ensure that the goroutine in the PATCH

View File

@ -43,7 +43,7 @@ func TestPost(t *testing.T) {
NotifyCreatedUploads: true, NotifyCreatedUploads: true,
}) })
c := make(chan FileInfo, 1) c := make(chan HookEvent, 1)
handler.CreatedUploads = c handler.CreatedUploads = c
(&httpTest{ (&httpTest{
@ -60,7 +60,8 @@ func TestPost(t *testing.T) {
}, },
}).Run(handler, t) }).Run(handler, t)
info := <-c event := <-c
info := event.Upload
a := assert.New(t) a := assert.New(t)
a.Equal("foo", info.ID) a.Equal("foo", info.ID)
@ -91,7 +92,7 @@ func TestPost(t *testing.T) {
NotifyCompleteUploads: true, NotifyCompleteUploads: true,
}) })
handler.CompleteUploads = make(chan FileInfo, 1) handler.CompleteUploads = make(chan HookEvent, 1)
(&httpTest{ (&httpTest{
Method: "POST", Method: "POST",
@ -105,12 +106,17 @@ func TestPost(t *testing.T) {
}, },
}).Run(handler, t) }).Run(handler, t)
info := <-handler.CompleteUploads event := <-handler.CompleteUploads
info := event.Upload
a := assert.New(t) a := assert.New(t)
a.Equal("foo", info.ID) a.Equal("foo", info.ID)
a.Equal(int64(0), info.Size) a.Equal(int64(0), info.Size)
a.Equal(int64(0), info.Offset) a.Equal(int64(0), info.Offset)
req := event.HTTPRequest
a.Equal("POST", req.Method)
a.Equal("", req.URI)
}) })
SubTest(t, "CreateExceedingMaxSizeFail", func(t *testing.T, store *MockFullDataStore, composer *StoreComposer) { SubTest(t, "CreateExceedingMaxSizeFail", func(t *testing.T, store *MockFullDataStore, composer *StoreComposer) {

View File

@ -60,7 +60,7 @@ func TestTerminate(t *testing.T) {
NotifyTerminatedUploads: true, NotifyTerminatedUploads: true,
}) })
c := make(chan FileInfo, 1) c := make(chan HookEvent, 1)
handler.TerminatedUploads = c handler.TerminatedUploads = c
(&httpTest{ (&httpTest{
@ -72,11 +72,16 @@ func TestTerminate(t *testing.T) {
Code: http.StatusNoContent, Code: http.StatusNoContent,
}).Run(handler, t) }).Run(handler, t)
info := <-c event := <-c
info := event.Upload
a := assert.New(t) a := assert.New(t)
a.Equal("foo", info.ID) a.Equal("foo", info.ID)
a.Equal(int64(10), info.Size) a.Equal(int64(10), info.Size)
req := event.HTTPRequest
a.Equal("DELETE", req.Method)
a.Equal("foo", req.URI)
}) })
SubTest(t, "NotProvided", func(t *testing.T, store *MockFullDataStore, composer *StoreComposer) { SubTest(t, "NotProvided", func(t *testing.T, store *MockFullDataStore, composer *StoreComposer) {

View File

@ -74,6 +74,40 @@ var (
ErrUploadStoppedByServer = NewHTTPError(errors.New("upload has been stopped by server"), http.StatusBadRequest) ErrUploadStoppedByServer = NewHTTPError(errors.New("upload has been stopped by server"), http.StatusBadRequest)
) )
// HTTPRequest contains basic details of an incoming HTTP request.
type HTTPRequest struct {
// Method is the HTTP method, e.g. POST or PATCH
Method string
// URI is the full HTTP request URI, e.g. /files/fooo
URI string
// RemoteAddr contains the network address that sent the request
RemoteAddr string
// Header contains all HTTP headers as present in the HTTP request.
Header http.Header
}
// HookEvent represents an event from tusd which can be handled by the application.
type HookEvent struct {
// Upload contains information about the upload that caused this hook
// to be fired.
Upload FileInfo
// HTTPRequest contains details about the HTTP request that reached
// tusd.
HTTPRequest HTTPRequest
}
func newHookEvent(info FileInfo, r *http.Request) HookEvent {
return HookEvent{
Upload: info,
HTTPRequest: HTTPRequest{
Method: r.Method,
URI: r.RequestURI,
RemoteAddr: r.RemoteAddr,
Header: r.Header,
},
}
}
// UnroutedHandler exposes methods to handle requests as part of the tus protocol, // UnroutedHandler exposes methods to handle requests as part of the tus protocol,
// such as PostFile, HeadFile, PatchFile and DelFile. In addition the GetFile method // such as PostFile, HeadFile, PatchFile and DelFile. In addition the GetFile method
// is provided which is, however, not part of the specification. // is provided which is, however, not part of the specification.
@ -86,33 +120,33 @@ type UnroutedHandler struct {
extensions string extensions string
// CompleteUploads is used to send notifications whenever an upload is // CompleteUploads is used to send notifications whenever an upload is
// completed by a user. The FileInfo will contain information about this // completed by a user. The HookEvent will contain information about this
// upload after it is completed. Sending to this channel will only // upload after it is completed. Sending to this channel will only
// happen if the NotifyCompleteUploads field is set to true in the Config // happen if the NotifyCompleteUploads field is set to true in the Config
// structure. Notifications will also be sent for completions using the // structure. Notifications will also be sent for completions using the
// Concatenation extension. // Concatenation extension.
CompleteUploads chan FileInfo CompleteUploads chan HookEvent
// TerminatedUploads is used to send notifications whenever an upload is // TerminatedUploads is used to send notifications whenever an upload is
// terminated by a user. The FileInfo will contain information about this // terminated by a user. The HookEvent will contain information about this
// upload gathered before the termination. Sending to this channel will only // upload gathered before the termination. Sending to this channel will only
// happen if the NotifyTerminatedUploads field is set to true in the Config // happen if the NotifyTerminatedUploads field is set to true in the Config
// structure. // structure.
TerminatedUploads chan FileInfo TerminatedUploads chan HookEvent
// UploadProgress is used to send notifications about the progress of the // UploadProgress is used to send notifications about the progress of the
// currently running uploads. For each open PATCH request, every second // currently running uploads. For each open PATCH request, every second
// a FileInfo instance will be send over this channel with the Offset field // a HookEvent instance will be send over this channel with the Offset field
// being set to the number of bytes which have been transfered to the server. // being set to the number of bytes which have been transfered to the server.
// Please be aware that this number may be higher than the number of bytes // Please be aware that this number may be higher than the number of bytes
// which have been stored by the data store! Sending to this channel will only // which have been stored by the data store! Sending to this channel will only
// happen if the NotifyUploadProgress field is set to true in the Config // happen if the NotifyUploadProgress field is set to true in the Config
// structure. // structure.
UploadProgress chan FileInfo UploadProgress chan HookEvent
// CreatedUploads is used to send notifications about the uploads having been // CreatedUploads is used to send notifications about the uploads having been
// created. It triggers post creation and therefore has all the FileInfo incl. // created. It triggers post creation and therefore has all the HookEvent incl.
// the ID available already. It facilitates the post-create hook. Sending to // the ID available already. It facilitates the post-create hook. Sending to
// this channel will only happen if the NotifyCreatedUploads field is set to // this channel will only happen if the NotifyCreatedUploads field is set to
// true in the Config structure. // true in the Config structure.
CreatedUploads chan FileInfo CreatedUploads chan HookEvent
// Metrics provides numbers of the usage for this handler. // Metrics provides numbers of the usage for this handler.
Metrics Metrics Metrics Metrics
} }
@ -143,10 +177,10 @@ func NewUnroutedHandler(config Config) (*UnroutedHandler, error) {
composer: config.StoreComposer, composer: config.StoreComposer,
basePath: config.BasePath, basePath: config.BasePath,
isBasePathAbs: config.isAbs, isBasePathAbs: config.isAbs,
CompleteUploads: make(chan FileInfo), CompleteUploads: make(chan HookEvent),
TerminatedUploads: make(chan FileInfo), TerminatedUploads: make(chan HookEvent),
UploadProgress: make(chan FileInfo), UploadProgress: make(chan HookEvent),
CreatedUploads: make(chan FileInfo), CreatedUploads: make(chan HookEvent),
logger: config.Logger, logger: config.Logger,
extensions: extensions, extensions: extensions,
Metrics: newMetrics(), Metrics: newMetrics(),
@ -306,6 +340,13 @@ func (handler *UnroutedHandler) PostFile(w http.ResponseWriter, r *http.Request)
PartialUploads: partialUploads, PartialUploads: partialUploads,
} }
if handler.config.PreUploadCreateCallback != nil {
if err := handler.config.PreUploadCreateCallback(newHookEvent(info, r)); err != nil {
handler.sendError(w, r, err)
return
}
}
upload, err := handler.composer.Core.NewUpload(ctx, info) upload, err := handler.composer.Core.NewUpload(ctx, info)
if err != nil { if err != nil {
handler.sendError(w, r, err) handler.sendError(w, r, err)
@ -329,7 +370,7 @@ func (handler *UnroutedHandler) PostFile(w http.ResponseWriter, r *http.Request)
handler.log("UploadCreated", "id", id, "size", i64toa(size), "url", url) handler.log("UploadCreated", "id", id, "size", i64toa(size), "url", url)
if handler.config.NotifyCreatedUploads { if handler.config.NotifyCreatedUploads {
handler.CreatedUploads <- info handler.CreatedUploads <- newHookEvent(info, r)
} }
if isFinal { if isFinal {
@ -340,7 +381,7 @@ func (handler *UnroutedHandler) PostFile(w http.ResponseWriter, r *http.Request)
info.Offset = size info.Offset = size
if handler.config.NotifyCompleteUploads { if handler.config.NotifyCompleteUploads {
handler.CompleteUploads <- info handler.CompleteUploads <- newHookEvent(info, r)
} }
} }
@ -363,7 +404,7 @@ func (handler *UnroutedHandler) PostFile(w http.ResponseWriter, r *http.Request)
// Directly finish the upload if the upload is empty (i.e. has a size of 0). // Directly finish the upload if the upload is empty (i.e. has a size of 0).
// This statement is in an else-if block to avoid causing duplicate calls // This statement is in an else-if block to avoid causing duplicate calls
// to finishUploadIfComplete if an upload is empty and contains a chunk. // to finishUploadIfComplete if an upload is empty and contains a chunk.
handler.finishUploadIfComplete(ctx, upload, info) handler.finishUploadIfComplete(ctx, upload, info, r)
} }
handler.sendResp(w, r, http.StatusCreated) handler.sendResp(w, r, http.StatusCreated)
@ -590,14 +631,14 @@ func (handler *UnroutedHandler) writeChunk(upload Upload, info FileInfo, w http.
if handler.config.NotifyUploadProgress { if handler.config.NotifyUploadProgress {
var stopProgressEvents chan<- struct{} var stopProgressEvents chan<- struct{}
reader, stopProgressEvents = handler.sendProgressMessages(info, reader) reader, stopProgressEvents = handler.sendProgressMessages(newHookEvent(info, r), reader)
defer close(stopProgressEvents) defer close(stopProgressEvents)
} }
var err error var err error
bytesWritten, err = upload.WriteChunk(ctx, offset, reader) bytesWritten, err = upload.WriteChunk(ctx, offset, reader)
if terminateUpload && handler.composer.UsesTerminater { if terminateUpload && handler.composer.UsesTerminater {
if terminateErr := handler.terminateUpload(ctx, upload, info); terminateErr != nil { if terminateErr := handler.terminateUpload(ctx, upload, info, r); terminateErr != nil {
// We only log this error and not show it to the user since this // We only log this error and not show it to the user since this
// termination error is not relevant to the uploading client // termination error is not relevant to the uploading client
handler.log("UploadStopTerminateError", "id", id, "error", terminateErr.Error()) handler.log("UploadStopTerminateError", "id", id, "error", terminateErr.Error())
@ -624,13 +665,13 @@ func (handler *UnroutedHandler) writeChunk(upload Upload, info FileInfo, w http.
handler.Metrics.incBytesReceived(uint64(bytesWritten)) handler.Metrics.incBytesReceived(uint64(bytesWritten))
info.Offset = newOffset info.Offset = newOffset
return handler.finishUploadIfComplete(ctx, upload, info) return handler.finishUploadIfComplete(ctx, upload, info, r)
} }
// finishUploadIfComplete checks whether an upload is completed (i.e. upload offset // finishUploadIfComplete checks whether an upload is completed (i.e. upload offset
// matches upload size) and if so, it will call the data store's FinishUpload // matches upload size) and if so, it will call the data store's FinishUpload
// function and send the necessary message on the CompleteUpload channel. // function and send the necessary message on the CompleteUpload channel.
func (handler *UnroutedHandler) finishUploadIfComplete(ctx context.Context, upload Upload, info FileInfo) error { func (handler *UnroutedHandler) finishUploadIfComplete(ctx context.Context, upload Upload, info FileInfo, r *http.Request) error {
// If the upload is completed, ... // If the upload is completed, ...
if !info.SizeIsDeferred && info.Offset == info.Size { if !info.SizeIsDeferred && info.Offset == info.Size {
// ... allow custom mechanism to finish and cleanup the upload // ... allow custom mechanism to finish and cleanup the upload
@ -640,7 +681,7 @@ func (handler *UnroutedHandler) finishUploadIfComplete(ctx context.Context, uplo
// ... send the info out to the channel // ... send the info out to the channel
if handler.config.NotifyCompleteUploads { if handler.config.NotifyCompleteUploads {
handler.CompleteUploads <- info handler.CompleteUploads <- newHookEvent(info, r)
} }
handler.Metrics.incUploadsFinished() handler.Metrics.incUploadsFinished()
@ -812,7 +853,7 @@ func (handler *UnroutedHandler) DelFile(w http.ResponseWriter, r *http.Request)
} }
} }
err = handler.terminateUpload(ctx, upload, info) err = handler.terminateUpload(ctx, upload, info, r)
if err != nil { if err != nil {
handler.sendError(w, r, err) handler.sendError(w, r, err)
return return
@ -826,7 +867,7 @@ func (handler *UnroutedHandler) DelFile(w http.ResponseWriter, r *http.Request)
// and updates the statistics. // and updates the statistics.
// Note the the info argument is only needed if the terminated uploads // Note the the info argument is only needed if the terminated uploads
// notifications are enabled. // notifications are enabled.
func (handler *UnroutedHandler) terminateUpload(ctx context.Context, upload Upload, info FileInfo) error { func (handler *UnroutedHandler) terminateUpload(ctx context.Context, upload Upload, info FileInfo, r *http.Request) error {
terminatableUpload := handler.composer.Terminater.AsTerminatableUpload(upload) terminatableUpload := handler.composer.Terminater.AsTerminatableUpload(upload)
err := terminatableUpload.Terminate(ctx) err := terminatableUpload.Terminate(ctx)
@ -835,7 +876,7 @@ func (handler *UnroutedHandler) terminateUpload(ctx context.Context, upload Uplo
} }
if handler.config.NotifyTerminatedUploads { if handler.config.NotifyTerminatedUploads {
handler.TerminatedUploads <- info handler.TerminatedUploads <- newHookEvent(info, r)
} }
handler.Metrics.incUploadsTerminated() handler.Metrics.incUploadsTerminated()
@ -921,10 +962,10 @@ func (w *progressWriter) Write(b []byte) (int, error) {
// every second, indicating how much data has been transfered to the server. // every second, indicating how much data has been transfered to the server.
// It will stop sending these instances once the returned channel has been // It will stop sending these instances once the returned channel has been
// closed. The returned reader should be used to read the request body. // closed. The returned reader should be used to read the request body.
func (handler *UnroutedHandler) sendProgressMessages(info FileInfo, reader io.Reader) (io.Reader, chan<- struct{}) { func (handler *UnroutedHandler) sendProgressMessages(hook HookEvent, reader io.Reader) (io.Reader, chan<- struct{}) {
previousOffset := int64(0) previousOffset := int64(0)
progress := &progressWriter{ progress := &progressWriter{
Offset: info.Offset, Offset: hook.Upload.Offset,
} }
stop := make(chan struct{}, 1) stop := make(chan struct{}, 1)
reader = io.TeeReader(reader, progress) reader = io.TeeReader(reader, progress)
@ -933,17 +974,17 @@ func (handler *UnroutedHandler) sendProgressMessages(info FileInfo, reader io.Re
for { for {
select { select {
case <-stop: case <-stop:
info.Offset = atomic.LoadInt64(&progress.Offset) hook.Upload.Offset = atomic.LoadInt64(&progress.Offset)
if info.Offset != previousOffset { if hook.Upload.Offset != previousOffset {
handler.UploadProgress <- info handler.UploadProgress <- hook
previousOffset = info.Offset previousOffset = hook.Upload.Offset
} }
return return
case <-time.After(1 * time.Second): case <-time.After(1 * time.Second):
info.Offset = atomic.LoadInt64(&progress.Offset) hook.Upload.Offset = atomic.LoadInt64(&progress.Offset)
if info.Offset != previousOffset { if hook.Upload.Offset != previousOffset {
handler.UploadProgress <- info handler.UploadProgress <- hook
previousOffset = info.Offset previousOffset = hook.Upload.Offset
} }
} }
} }

View File

@ -58,6 +58,7 @@ type httpTest struct {
func (test *httpTest) Run(handler http.Handler, t *testing.T) *httptest.ResponseRecorder { func (test *httpTest) Run(handler http.Handler, t *testing.T) *httptest.ResponseRecorder {
req, _ := http.NewRequest(test.Method, test.URL, test.ReqBody) req, _ := http.NewRequest(test.Method, test.URL, test.ReqBody)
req.RequestURI = test.URL
// Add headers // Add headers
for key, value := range test.ReqHeader { for key, value := range test.ReqHeader {