Add NotifyUploadProgress configuration option

This commit is contained in:
Marius 2017-01-19 21:02:48 +01:00
parent d5a0bd6e02
commit 97bd6db73f
4 changed files with 49 additions and 2 deletions

View File

@ -44,6 +44,8 @@ func SetupPostHooks(handler *tusd.Handler) {
invokeHook(HookPostFinish, info) invokeHook(HookPostFinish, info)
case info := <-handler.TerminatedUploads: case info := <-handler.TerminatedUploads:
invokeHook(HookPostTerminate, info) invokeHook(HookPostTerminate, info)
case info := <-handler.UploadProgress:
fmt.Println(info.Size, info.Offset)
} }
} }
}() }()

View File

@ -17,6 +17,7 @@ func Serve() {
StoreComposer: Composer, StoreComposer: Composer,
NotifyCompleteUploads: true, NotifyCompleteUploads: true,
NotifyTerminatedUploads: true, NotifyTerminatedUploads: true,
NotifyUploadProgress: true,
}) })
if err != nil { if err != nil {
stderr.Fatalf("Unable to create handler: %s", err) stderr.Fatalf("Unable to create handler: %s", err)

View File

@ -31,6 +31,7 @@ type Config struct {
// NotifyTerminatedUploads indicates whether sending notifications about // NotifyTerminatedUploads indicates whether sending notifications about
// terminated uploads using the TerminatedUploads channel should be enabled. // terminated uploads using the TerminatedUploads channel should be enabled.
NotifyTerminatedUploads bool NotifyTerminatedUploads bool
NotifyUploadProgress bool
// Logger is the logger to use internally, mostly for printing requests. // Logger is the logger to use internally, mostly for printing requests.
Logger *log.Logger Logger *log.Logger
// Respect the X-Forwarded-Host, X-Forwarded-Proto and Forwarded headers // Respect the X-Forwarded-Host, X-Forwarded-Proto and Forwarded headers

View File

@ -10,6 +10,8 @@ import (
"regexp" "regexp"
"strconv" "strconv"
"strings" "strings"
"sync/atomic"
"time"
) )
var ( var (
@ -75,6 +77,7 @@ type UnroutedHandler struct {
// happen if the NotifyTerminatedUploads field is set to true in the Config // happen if the NotifyTerminatedUploads field is set to true in the Config
// structure. // structure.
TerminatedUploads chan FileInfo TerminatedUploads chan FileInfo
UploadProgress chan FileInfo
// Metrics provides numbers of the usage for this handler. // Metrics provides numbers of the usage for this handler.
Metrics Metrics Metrics Metrics
} }
@ -104,6 +107,7 @@ func NewUnroutedHandler(config Config) (*UnroutedHandler, error) {
isBasePathAbs: config.isAbs, isBasePathAbs: config.isAbs,
CompleteUploads: make(chan FileInfo), CompleteUploads: make(chan FileInfo),
TerminatedUploads: make(chan FileInfo), TerminatedUploads: make(chan FileInfo),
UploadProgress: make(chan FileInfo),
logger: config.Logger, logger: config.Logger,
extensions: extensions, extensions: extensions,
Metrics: newMetrics(), Metrics: newMetrics(),
@ -428,12 +432,18 @@ func (handler *UnroutedHandler) writeChunk(id string, info FileInfo, w http.Resp
handler.log("ChunkWriteStart", "id", id, "maxSize", i64toa(maxSize), "offset", i64toa(offset)) handler.log("ChunkWriteStart", "id", id, "maxSize", i64toa(maxSize), "offset", i64toa(offset))
var bytesWritten int64 var bytesWritten int64
// Prevent a nil pointer derefernce when accessing the body which may not be // Prevent a nil pointer dereference when accessing the body which may not be
// available in the case of a malicious request. // available in the case of a malicious request.
if r.Body != nil { if r.Body != nil {
// Limit the data read from the request's body to the allowed maxiumum // Limit the data read from the request's body to the allowed maximum
reader := io.LimitReader(r.Body, maxSize) reader := io.LimitReader(r.Body, maxSize)
if handler.config.NotifyUploadProgress {
var stop chan<- struct{}
reader, stop = handler.sendProgressMessages(info, reader)
defer close(stop)
}
var err error var err error
bytesWritten, err = handler.composer.Core.WriteChunk(id, offset, reader) bytesWritten, err = handler.composer.Core.WriteChunk(id, offset, reader)
if err != nil { if err != nil {
@ -625,6 +635,39 @@ func (handler *UnroutedHandler) absFileURL(r *http.Request, id string) string {
return url return url
} }
type progressWriter struct {
Offset int64
}
func (w *progressWriter) Write(b []byte) (int, error) {
atomic.AddInt64(&w.Offset, int64(len(b)))
return len(b), nil
}
func (handler *UnroutedHandler) sendProgressMessages(info FileInfo, reader io.Reader) (io.Reader, chan<- struct{}) {
progress := &progressWriter{
Offset: info.Offset,
}
stop := make(chan struct{}, 1)
reader = io.TeeReader(reader, progress)
go func() {
for {
select {
case <-stop:
info.Offset = atomic.LoadInt64(&progress.Offset)
handler.UploadProgress <- info
return
case <-time.After(1 * time.Second):
info.Offset = atomic.LoadInt64(&progress.Offset)
handler.UploadProgress <- info
}
}
}()
return reader, stop
}
// getHostAndProtocol extracts the host and used protocol (either HTTP or HTTPS) // getHostAndProtocol extracts the host and used protocol (either HTTP or HTTPS)
// from the given request. If `allowForwarded` is set, the X-Forwarded-Host, // from the given request. If `allowForwarded` is set, the X-Forwarded-Host,
// X-Forwarded-Proto and Forwarded headers will also be checked to // X-Forwarded-Proto and Forwarded headers will also be checked to