diff --git a/cmd/tusd/cli/composer.go b/cmd/tusd/cli/composer.go new file mode 100644 index 0000000..492433b --- /dev/null +++ b/cmd/tusd/cli/composer.go @@ -0,0 +1,61 @@ +package cli + +import ( + "os" + + "github.com/tus/tusd" + "github.com/tus/tusd/filestore" + "github.com/tus/tusd/limitedstore" + "github.com/tus/tusd/memorylocker" + "github.com/tus/tusd/s3store" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" +) + +var Composer *tusd.StoreComposer + +func CreateComposer() { + // Attempt to use S3 as a backend if the -s3-bucket option has been supplied. + // If not, we default to storing them locally on disk. + Composer = tusd.NewStoreComposer() + if Flags.S3Bucket == "" { + dir := Flags.UploadDir + + stdout.Printf("Using '%s' as directory storage.\n", dir) + if err := os.MkdirAll(dir, os.FileMode(0775)); err != nil { + stderr.Fatalf("Unable to ensure directory exists: %s", err) + } + + store := filestore.New(dir) + store.UseIn(Composer) + } else { + stdout.Printf("Using 's3://%s' as S3 bucket for storage.\n", Flags.S3Bucket) + + // Derive credentials from AWS_SECRET_ACCESS_KEY, AWS_ACCESS_KEY_ID and + // AWS_REGION environment variables. + credentials := aws.NewConfig().WithCredentials(credentials.NewEnvCredentials()) + store := s3store.New(Flags.S3Bucket, s3.New(session.New(), credentials)) + store.UseIn(Composer) + + locker := memorylocker.New() + locker.UseIn(Composer) + } + + storeSize := Flags.StoreSize + maxSize := Flags.MaxSize + + if storeSize > 0 { + limitedstore.New(storeSize, Composer.Core, Composer.Terminater).UseIn(Composer) + stdout.Printf("Using %.2fMB as storage size.\n", float64(storeSize)/1024/1024) + + // We need to ensure that a single upload can fit into the storage size + if maxSize > storeSize || maxSize == 0 { + Flags.MaxSize = storeSize + } + } + + stdout.Printf("Using %.2fMB as maximum size.\n", float64(Flags.MaxSize)/1024/1024) +} diff --git a/cmd/tusd/cli/flags.go b/cmd/tusd/cli/flags.go new file mode 100644 index 0000000..20c8af2 --- /dev/null +++ b/cmd/tusd/cli/flags.go @@ -0,0 +1,54 @@ +package cli + +import ( + "flag" + "path/filepath" +) + +var Flags struct { + HttpHost string + HttpPort string + MaxSize int64 + UploadDir string + StoreSize int64 + Basepath string + Timeout int64 + S3Bucket string + HooksDir string + ShowVersion bool + ExposeMetrics bool + BehindProxy bool + + HooksInstalled bool +} + +func ParseFlags() { + flag.StringVar(&Flags.HttpHost, "host", "0.0.0.0", "Host to bind HTTP server to") + flag.StringVar(&Flags.HttpPort, "port", "1080", "Port to bind HTTP server to") + flag.Int64Var(&Flags.MaxSize, "max-size", 0, "Maximum size of a single upload in bytes") + flag.StringVar(&Flags.UploadDir, "dir", "./data", "Directory to store uploads in") + flag.Int64Var(&Flags.StoreSize, "store-size", 0, "Size of space allowed for storage") + flag.StringVar(&Flags.Basepath, "base-path", "/files/", "Basepath of the HTTP server") + flag.Int64Var(&Flags.Timeout, "timeout", 30*1000, "Read timeout for connections in milliseconds. A zero value means that reads will not timeout") + flag.StringVar(&Flags.S3Bucket, "s3-bucket", "", "Use AWS S3 with this bucket as storage backend (requires the AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_REGION environment variables to be set)") + flag.StringVar(&Flags.HooksDir, "hooks-dir", "", "Directory to search for available hooks scripts") + flag.BoolVar(&Flags.ShowVersion, "version", false, "Print tusd version information") + flag.BoolVar(&Flags.ExposeMetrics, "expose-metrics", true, "Expose metrics about tusd usage") + flag.BoolVar(&Flags.BehindProxy, "behind-proxy", false, "Respect X-Forwarded-* and similar headers which may be set by proxies") + + flag.Parse() + + if Flags.HooksDir != "" { + Flags.HooksDir, _ = filepath.Abs(Flags.HooksDir) + Flags.HooksInstalled = true + + stdout.Printf("Using '%s' for hooks", Flags.HooksDir) + } + + if Flags.UploadDir == "" && Flags.S3Bucket == "" { + stderr.Fatalf("Either an upload directory (using -dir) or an AWS S3 Bucket " + + "(using -s3-bucket) must be specified to start tusd but " + + "neither flag was provided. Please consult `tusd -help` for " + + "more information on these options.") + } +} diff --git a/cmd/tusd/cli/greeting.go b/cmd/tusd/cli/greeting.go new file mode 100644 index 0000000..47f8d31 --- /dev/null +++ b/cmd/tusd/cli/greeting.go @@ -0,0 +1,32 @@ +package cli + +import ( + "fmt" + "net/http" +) + +var greeting string + +func PrepareGreeting() { + greeting = fmt.Sprintf( + `Welcome to tusd +=============== + +Congratulations for setting up tusd! You are now part of the chosen elite and +able to experience the feeling of resumable uploads! We hope you are as excited +as we are (a lot)! + +However, there is something you should be aware of: While you got tusd +running (you did an awesome job!), this is the root directory of the server +and tus requests are only accepted at the %s route. + +So don't waste time, head over there and experience the future! + +Version = %s +GitCommit = %s +BuildDate = %s`, Flags.Basepath, VersionName, GitCommit, BuildDate) +} + +func DisplayGreeting(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(greeting)) +} diff --git a/cmd/tusd/cli/hooks.go b/cmd/tusd/cli/hooks.go new file mode 100644 index 0000000..c705185 --- /dev/null +++ b/cmd/tusd/cli/hooks.go @@ -0,0 +1,72 @@ +package cli + +import ( + "bytes" + "encoding/json" + "os" + "os/exec" + "strconv" + + "github.com/tus/tusd" +) + +type HookType string + +const ( + HookPostFinish HookType = "post-finish" + HookPostTerminate HookType = "post-terminate" +) + +func SetupHooks(handler *tusd.Handler) { + go func() { + for { + select { + case info := <-handler.CompleteUploads: + invokeHook(HookPostFinish, info) + case info := <-handler.TerminatedUploads: + invokeHook(HookPostTerminate, info) + } + } + }() +} + +func invokeHook(typ HookType, info tusd.FileInfo) { + switch typ { + case HookPostFinish: + stdout.Printf("Upload %s (%d bytes) finished\n", info.ID, info.Size) + case HookPostTerminate: + stdout.Printf("Upload %s terminated\n", info.ID) + } + + if !Flags.HooksInstalled { + return + } + + name := string(typ) + stdout.Printf("Invoking %s hook…\n", name) + + cmd := exec.Command(Flags.HooksDir + "/" + name) + env := os.Environ() + env = append(env, "TUS_ID="+info.ID) + env = append(env, "TUS_SIZE="+strconv.FormatInt(info.Size, 10)) + + jsonInfo, err := json.Marshal(info) + if err != nil { + stderr.Printf("Error encoding JSON for hook: %s", err) + } + + reader := bytes.NewReader(jsonInfo) + cmd.Stdin = reader + + cmd.Env = env + cmd.Dir = Flags.HooksDir + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + go func() { + err := cmd.Run() + if err != nil { + stderr.Printf("Error running %s hook for %s: %s", name, info.ID, err) + } + }() +} diff --git a/cmd/tusd/cli/listener.go b/cmd/tusd/cli/listener.go new file mode 100644 index 0000000..4438c19 --- /dev/null +++ b/cmd/tusd/cli/listener.go @@ -0,0 +1,89 @@ +package cli + +import ( + "net" + "time" +) + +// Listener wraps a net.Listener, and gives a place to store the timeout +// parameters. On Accept, it will wrap the net.Conn with our own Conn for us. +// Original implementation taken from https://gist.github.com/jbardin/9663312 +// Thanks! <3 +type Listener struct { + net.Listener + ReadTimeout time.Duration + WriteTimeout time.Duration +} + +func (l *Listener) Accept() (net.Conn, error) { + c, err := l.Listener.Accept() + if err != nil { + return nil, err + } + + go MetricsOpenConnections.Inc() + + tc := &Conn{ + Conn: c, + ReadTimeout: l.ReadTimeout, + WriteTimeout: l.WriteTimeout, + } + return tc, nil +} + +// Conn wraps a net.Conn, and sets a deadline for every read +// and write operation. +type Conn struct { + net.Conn + ReadTimeout time.Duration + WriteTimeout time.Duration +} + +func (c *Conn) Read(b []byte) (int, error) { + var err error + if c.ReadTimeout > 0 { + err = c.Conn.SetReadDeadline(time.Now().Add(c.ReadTimeout)) + } else { + err = c.Conn.SetReadDeadline(time.Time{}) + } + + if err != nil { + return 0, err + } + + return c.Conn.Read(b) +} + +func (c *Conn) Write(b []byte) (int, error) { + var err error + if c.WriteTimeout > 0 { + err = c.Conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout)) + } else { + err = c.Conn.SetWriteDeadline(time.Time{}) + } + + if err != nil { + return 0, err + } + + return c.Conn.Write(b) +} + +func (c *Conn) Close() error { + go MetricsOpenConnections.Dec() + return c.Conn.Close() +} + +func NewListener(addr string, readTimeout, writeTimeout time.Duration) (net.Listener, error) { + l, err := net.Listen("tcp", addr) + if err != nil { + return nil, err + } + + tl := &Listener{ + Listener: l, + ReadTimeout: readTimeout, + WriteTimeout: writeTimeout, + } + return tl, nil +} diff --git a/cmd/tusd/cli/log.go b/cmd/tusd/cli/log.go new file mode 100644 index 0000000..84917e8 --- /dev/null +++ b/cmd/tusd/cli/log.go @@ -0,0 +1,9 @@ +package cli + +import ( + "log" + "os" +) + +var stdout = log.New(os.Stdout, "[tusd] ", 0) +var stderr = log.New(os.Stderr, "[tusd] ", 0) diff --git a/cmd/tusd/cli/metrics.go b/cmd/tusd/cli/metrics.go new file mode 100644 index 0000000..2894b0e --- /dev/null +++ b/cmd/tusd/cli/metrics.go @@ -0,0 +1,22 @@ +package cli + +import ( + "net/http" + + "github.com/tus/tusd" + "github.com/tus/tusd/prometheuscollector" + + "github.com/prometheus/client_golang/prometheus" +) + +var MetricsOpenConnections = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "tusd_connections_open", + Help: "Current number of open connections.", +}) + +func SetupMetrics(handler *tusd.Handler) { + prometheus.MustRegister(MetricsOpenConnections) + prometheus.MustRegister(prometheuscollector.New(handler.Metrics)) + + http.Handle("/metrics", prometheus.Handler()) +} diff --git a/cmd/tusd/cli/serve.go b/cmd/tusd/cli/serve.go new file mode 100644 index 0000000..9b086ed --- /dev/null +++ b/cmd/tusd/cli/serve.go @@ -0,0 +1,53 @@ +package cli + +import ( + "net/http" + "time" + + "github.com/tus/tusd" +) + +func Serve() { + handler, err := tusd.NewHandler(tusd.Config{ + MaxSize: Flags.MaxSize, + BasePath: Flags.Basepath, + RespectForwardedHeaders: Flags.BehindProxy, + StoreComposer: Composer, + NotifyCompleteUploads: true, + NotifyTerminatedUploads: true, + }) + if err != nil { + stderr.Fatalf("Unable to create handler: %s", err) + } + + address := Flags.HttpHost + ":" + Flags.HttpPort + basepath := Flags.Basepath + + stdout.Printf("Using %s as address to listen.\n", address) + stdout.Printf("Using %s as the base path.\n", basepath) + stdout.Printf(Composer.Capabilities()) + + SetupHooks(handler) + + if Flags.ExposeMetrics { + SetupMetrics(handler) + } + + // Do not display the greeting if the tusd handler will be mounted at the root + // path. Else this would cause a "multiple registrations for /" panic. + if basepath != "/" { + http.HandleFunc("/", DisplayGreeting) + } + + http.Handle(basepath, http.StripPrefix(basepath, handler)) + + timeoutDuration := time.Duration(Flags.Timeout) * time.Millisecond + listener, err := NewListener(address, timeoutDuration, timeoutDuration) + if err != nil { + stderr.Fatalf("Unable to create listener: %s", err) + } + + if err = http.Serve(listener, nil); err != nil { + stderr.Fatalf("Unable to serve: %s", err) + } +} diff --git a/cmd/tusd/cli/version.go b/cmd/tusd/cli/version.go new file mode 100644 index 0000000..37504f4 --- /dev/null +++ b/cmd/tusd/cli/version.go @@ -0,0 +1,13 @@ +package cli + +import ( + "fmt" +) + +var VersionName = "n/a" +var GitCommit = "n/a" +var BuildDate = "n/a" + +func ShowVersion() { + fmt.Printf("Version: %s\nCommit: %s\nDate: %s\n", VersionName, GitCommit, BuildDate) +} diff --git a/cmd/tusd/main.go b/cmd/tusd/main.go index be51b6a..3e33c2a 100644 --- a/cmd/tusd/main.go +++ b/cmd/tusd/main.go @@ -1,329 +1,19 @@ package main import ( - "bytes" - "encoding/json" - "flag" - "fmt" - "log" - "net" - "net/http" - "os" - "os/exec" - "path/filepath" - "strconv" - "time" - - "github.com/tus/tusd" - "github.com/tus/tusd/filestore" - "github.com/tus/tusd/limitedstore" - "github.com/tus/tusd/memorylocker" - "github.com/tus/tusd/prometheuscollector" - "github.com/tus/tusd/s3store" - - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/s3" - - "github.com/prometheus/client_golang/prometheus" + "github.com/tus/tusd/cmd/tusd/cli" ) -var VersionName = "n/a" -var GitCommit = "n/a" -var BuildDate = "n/a" - -var httpHost string -var httpPort string -var maxSize int64 -var dir string -var storeSize int64 -var basepath string -var timeout int64 -var s3Bucket string -var hooksDir string -var version bool -var exposeMetrics bool -var behindProxy bool - -var stdout = log.New(os.Stdout, "[tusd] ", 0) -var stderr = log.New(os.Stderr, "[tusd] ", 0) - -var hookInstalled bool - -var greeting string - -var openConnections = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "connections_open_total", - Help: "Current number of open connections.", -}) - -func init() { - flag.StringVar(&httpHost, "host", "0.0.0.0", "Host to bind HTTP server to") - flag.StringVar(&httpPort, "port", "1080", "Port to bind HTTP server to") - flag.Int64Var(&maxSize, "max-size", 0, "Maximum size of a single upload in bytes") - flag.StringVar(&dir, "dir", "./data", "Directory to store uploads in") - flag.Int64Var(&storeSize, "store-size", 0, "Size of space allowed for storage") - flag.StringVar(&basepath, "base-path", "/files/", "Basepath of the HTTP server") - flag.Int64Var(&timeout, "timeout", 30*1000, "Read timeout for connections in milliseconds. A zero value means that reads will not timeout") - flag.StringVar(&s3Bucket, "s3-bucket", "", "Use AWS S3 with this bucket as storage backend (requires the AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_REGION environment variables to be set)") - flag.StringVar(&hooksDir, "hooks-dir", "", "Directory to search for available hooks scripts") - flag.BoolVar(&version, "version", false, "Print tusd version information") - flag.BoolVar(&exposeMetrics, "expose-metrics", true, "Expose metrics about tusd usage") - flag.BoolVar(&behindProxy, "behind-proxy", false, "Respect X-Forwarded-* and similar headers which may be set by proxies") - - flag.Parse() - - if hooksDir != "" { - hooksDir, _ = filepath.Abs(hooksDir) - hookInstalled = true - - stdout.Printf("Using '%s' for hooks", hooksDir) - } - - greeting = fmt.Sprintf( - `Welcome to tusd -=============== - -Congratulations for setting up tusd! You are now part of the chosen elite and -able to experience the feeling of resumable uploads! We hope you are as excited -as we are (a lot)! - -However, there is something you should be aware of: While you got tusd -running (you did an awesome job!), this is the root directory of the server -and tus requests are only accepted at the %s route. - -So don't waste time, head over there and experience the future! - -Version = %s -GitCommit = %s -BuildDate = %s`, basepath, VersionName, GitCommit, BuildDate) -} - func main() { + cli.ParseFlags() + cli.PrepareGreeting() + // Print version and other information and exit if the -version flag has been - // passed. - if version { - fmt.Printf("Version: %s\nCommit: %s\nDate: %s\n", VersionName, GitCommit, BuildDate) - - return - } - - // Attempt to use S3 as a backend if the -s3-bucket option has been supplied. - // If not, we default to storing them locally on disk. - composer := tusd.NewStoreComposer() - if s3Bucket == "" { - stdout.Printf("Using '%s' as directory storage.\n", dir) - if err := os.MkdirAll(dir, os.FileMode(0775)); err != nil { - stderr.Fatalf("Unable to ensure directory exists: %s", err) - } - - store := filestore.New(dir) - store.UseIn(composer) + // passed else we will start the HTTP server + if cli.Flags.ShowVersion { + cli.ShowVersion() } else { - stdout.Printf("Using 's3://%s' as S3 bucket for storage.\n", s3Bucket) - - // Derive credentials from AWS_SECRET_ACCESS_KEY, AWS_ACCESS_KEY_ID and - // AWS_REGION environment variables. - credentials := aws.NewConfig().WithCredentials(credentials.NewEnvCredentials()) - store := s3store.New(s3Bucket, s3.New(session.New(), credentials)) - store.UseIn(composer) - - locker := memorylocker.New() - locker.UseIn(composer) - } - - if storeSize > 0 { - limitedstore.New(storeSize, composer.Core, composer.Terminater).UseIn(composer) - stdout.Printf("Using %.2fMB as storage size.\n", float64(storeSize)/1024/1024) - - // We need to ensure that a single upload can fit into the storage size - if maxSize > storeSize || maxSize == 0 { - maxSize = storeSize - } - } - - stdout.Printf("Using %.2fMB as maximum size.\n", float64(maxSize)/1024/1024) - - handler, err := tusd.NewHandler(tusd.Config{ - MaxSize: maxSize, - BasePath: basepath, - StoreComposer: composer, - RespectForwardedHeaders: behindProxy, - NotifyCompleteUploads: true, - NotifyTerminatedUploads: true, - }) - if err != nil { - stderr.Fatalf("Unable to create handler: %s", err) - } - - address := httpHost + ":" + httpPort - stdout.Printf("Using %s as address to listen.\n", address) - - stdout.Printf("Using %s as the base path.\n", basepath) - - stdout.Printf(composer.Capabilities()) - - go func() { - for { - select { - case info := <-handler.CompleteUploads: - invokeHook("post-finish", info) - case info := <-handler.TerminatedUploads: - invokeHook("post-terminate", info) - } - } - }() - - if exposeMetrics { - prometheus.MustRegister(openConnections) - prometheus.MustRegister(prometheuscollector.New(handler.Metrics)) - - http.Handle("/metrics", prometheus.Handler()) - } - - // Do not display the greeting if the tusd handler will be mounted at the root - // path. Else this would cause a "multiple registrations for /" panic. - if basepath != "/" { - http.HandleFunc("/", displayGreeting) - } - - http.Handle(basepath, http.StripPrefix(basepath, handler)) - - timeoutDuration := time.Duration(timeout) * time.Millisecond - listener, err := NewListener(address, timeoutDuration, timeoutDuration) - if err != nil { - stderr.Fatalf("Unable to create listener: %s", err) - } - - if err = http.Serve(listener, nil); err != nil { - stderr.Fatalf("Unable to serve: %s", err) + cli.CreateComposer() + cli.Serve() } } - -func invokeHook(name string, info tusd.FileInfo) { - switch name { - case "post-finish": - stdout.Printf("Upload %s (%d bytes) finished\n", info.ID, info.Size) - case "post-terminate": - stdout.Printf("Upload %s terminated\n", info.ID) - } - - if !hookInstalled { - return - } - - stdout.Printf("Invoking %s hook…\n", name) - - cmd := exec.Command(hooksDir + "/" + name) - env := os.Environ() - env = append(env, "TUS_ID="+info.ID) - env = append(env, "TUS_SIZE="+strconv.FormatInt(info.Size, 10)) - - jsonInfo, err := json.Marshal(info) - if err != nil { - stderr.Printf("Error encoding JSON for hook: %s", err) - } - - reader := bytes.NewReader(jsonInfo) - cmd.Stdin = reader - - cmd.Env = env - cmd.Dir = hooksDir - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - - go func() { - err := cmd.Run() - if err != nil { - stderr.Printf("Error running %s hook for %s: %s", name, info.ID, err) - } - }() -} - -func displayGreeting(w http.ResponseWriter, r *http.Request) { - w.Write([]byte(greeting)) -} - -// Listener wraps a net.Listener, and gives a place to store the timeout -// parameters. On Accept, it will wrap the net.Conn with our own Conn for us. -// Original implementation taken from https://gist.github.com/jbardin/9663312 -// Thanks! <3 -type Listener struct { - net.Listener - ReadTimeout time.Duration - WriteTimeout time.Duration -} - -func (l *Listener) Accept() (net.Conn, error) { - c, err := l.Listener.Accept() - if err != nil { - return nil, err - } - - go openConnections.Inc() - - tc := &Conn{ - Conn: c, - ReadTimeout: l.ReadTimeout, - WriteTimeout: l.WriteTimeout, - } - return tc, nil -} - -// Conn wraps a net.Conn, and sets a deadline for every read -// and write operation. -type Conn struct { - net.Conn - ReadTimeout time.Duration - WriteTimeout time.Duration -} - -func (c *Conn) Read(b []byte) (int, error) { - var err error - if c.ReadTimeout > 0 { - err = c.Conn.SetReadDeadline(time.Now().Add(c.ReadTimeout)) - } else { - err = c.Conn.SetReadDeadline(time.Time{}) - } - - if err != nil { - return 0, err - } - - return c.Conn.Read(b) -} - -func (c *Conn) Write(b []byte) (int, error) { - var err error - if c.WriteTimeout > 0 { - err = c.Conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout)) - } else { - err = c.Conn.SetWriteDeadline(time.Time{}) - } - - if err != nil { - return 0, err - } - - return c.Conn.Write(b) -} - -func (c *Conn) Close() error { - go openConnections.Dec() - return c.Conn.Close() -} - -func NewListener(addr string, readTimeout, writeTimeout time.Duration) (net.Listener, error) { - l, err := net.Listen("tcp", addr) - if err != nil { - return nil, err - } - - tl := &Listener{ - Listener: l, - ReadTimeout: readTimeout, - WriteTimeout: writeTimeout, - } - return tl, nil -}