From 97bd6db73fa2a1a24958b944e014c0240fbbac6b Mon Sep 17 00:00:00 2001 From: Marius Date: Thu, 19 Jan 2017 21:02:48 +0100 Subject: [PATCH] Add NotifyUploadProgress configuration option --- cmd/tusd/cli/hooks.go | 2 ++ cmd/tusd/cli/serve.go | 1 + config.go | 1 + unrouted_handler.go | 47 +++++++++++++++++++++++++++++++++++++++++-- 4 files changed, 49 insertions(+), 2 deletions(-) diff --git a/cmd/tusd/cli/hooks.go b/cmd/tusd/cli/hooks.go index 7cb8d7f..cddf414 100644 --- a/cmd/tusd/cli/hooks.go +++ b/cmd/tusd/cli/hooks.go @@ -44,6 +44,8 @@ func SetupPostHooks(handler *tusd.Handler) { invokeHook(HookPostFinish, info) case info := <-handler.TerminatedUploads: invokeHook(HookPostTerminate, info) + case info := <-handler.UploadProgress: + fmt.Println(info.Size, info.Offset) } } }() diff --git a/cmd/tusd/cli/serve.go b/cmd/tusd/cli/serve.go index 37d6317..594d66e 100644 --- a/cmd/tusd/cli/serve.go +++ b/cmd/tusd/cli/serve.go @@ -17,6 +17,7 @@ func Serve() { StoreComposer: Composer, NotifyCompleteUploads: true, NotifyTerminatedUploads: true, + NotifyUploadProgress: true, }) if err != nil { stderr.Fatalf("Unable to create handler: %s", err) diff --git a/config.go b/config.go index 1ed82cf..f2517a2 100644 --- a/config.go +++ b/config.go @@ -31,6 +31,7 @@ type Config struct { // NotifyTerminatedUploads indicates whether sending notifications about // terminated uploads using the TerminatedUploads channel should be enabled. NotifyTerminatedUploads bool + NotifyUploadProgress bool // Logger is the logger to use internally, mostly for printing requests. Logger *log.Logger // Respect the X-Forwarded-Host, X-Forwarded-Proto and Forwarded headers diff --git a/unrouted_handler.go b/unrouted_handler.go index 8ceaf3e..fde83c4 100644 --- a/unrouted_handler.go +++ b/unrouted_handler.go @@ -10,6 +10,8 @@ import ( "regexp" "strconv" "strings" + "sync/atomic" + "time" ) var ( @@ -75,6 +77,7 @@ type UnroutedHandler struct { // happen if the NotifyTerminatedUploads field is set to true in the Config // structure. TerminatedUploads chan FileInfo + UploadProgress chan FileInfo // Metrics provides numbers of the usage for this handler. Metrics Metrics } @@ -104,6 +107,7 @@ func NewUnroutedHandler(config Config) (*UnroutedHandler, error) { isBasePathAbs: config.isAbs, CompleteUploads: make(chan FileInfo), TerminatedUploads: make(chan FileInfo), + UploadProgress: make(chan FileInfo), logger: config.Logger, extensions: extensions, Metrics: newMetrics(), @@ -428,12 +432,18 @@ func (handler *UnroutedHandler) writeChunk(id string, info FileInfo, w http.Resp handler.log("ChunkWriteStart", "id", id, "maxSize", i64toa(maxSize), "offset", i64toa(offset)) var bytesWritten int64 - // Prevent a nil pointer derefernce when accessing the body which may not be + // Prevent a nil pointer dereference when accessing the body which may not be // available in the case of a malicious request. if r.Body != nil { - // Limit the data read from the request's body to the allowed maxiumum + // Limit the data read from the request's body to the allowed maximum reader := io.LimitReader(r.Body, maxSize) + if handler.config.NotifyUploadProgress { + var stop chan<- struct{} + reader, stop = handler.sendProgressMessages(info, reader) + defer close(stop) + } + var err error bytesWritten, err = handler.composer.Core.WriteChunk(id, offset, reader) if err != nil { @@ -625,6 +635,39 @@ func (handler *UnroutedHandler) absFileURL(r *http.Request, id string) string { return url } +type progressWriter struct { + Offset int64 +} + +func (w *progressWriter) Write(b []byte) (int, error) { + atomic.AddInt64(&w.Offset, int64(len(b))) + return len(b), nil +} + +func (handler *UnroutedHandler) sendProgressMessages(info FileInfo, reader io.Reader) (io.Reader, chan<- struct{}) { + progress := &progressWriter{ + Offset: info.Offset, + } + stop := make(chan struct{}, 1) + reader = io.TeeReader(reader, progress) + + go func() { + for { + select { + case <-stop: + info.Offset = atomic.LoadInt64(&progress.Offset) + handler.UploadProgress <- info + return + case <-time.After(1 * time.Second): + info.Offset = atomic.LoadInt64(&progress.Offset) + handler.UploadProgress <- info + } + } + }() + + return reader, stop +} + // getHostAndProtocol extracts the host and used protocol (either HTTP or HTTPS) // from the given request. If `allowForwarded` is set, the X-Forwarded-Host, // X-Forwarded-Proto and Forwarded headers will also be checked to