Refactor tusd binary into multiple files

This commit is contained in:
Marius 2016-07-11 22:02:01 +02:00
parent a2a8c5ec86
commit bad81737b0
10 changed files with 414 additions and 319 deletions

61
cmd/tusd/cli/composer.go Normal file
View File

@ -0,0 +1,61 @@
package cli
import (
"os"
"github.com/tus/tusd"
"github.com/tus/tusd/filestore"
"github.com/tus/tusd/limitedstore"
"github.com/tus/tusd/memorylocker"
"github.com/tus/tusd/s3store"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
)
var Composer *tusd.StoreComposer
func CreateComposer() {
// Attempt to use S3 as a backend if the -s3-bucket option has been supplied.
// If not, we default to storing them locally on disk.
Composer = tusd.NewStoreComposer()
if Flags.S3Bucket == "" {
dir := Flags.UploadDir
stdout.Printf("Using '%s' as directory storage.\n", dir)
if err := os.MkdirAll(dir, os.FileMode(0775)); err != nil {
stderr.Fatalf("Unable to ensure directory exists: %s", err)
}
store := filestore.New(dir)
store.UseIn(Composer)
} else {
stdout.Printf("Using 's3://%s' as S3 bucket for storage.\n", Flags.S3Bucket)
// Derive credentials from AWS_SECRET_ACCESS_KEY, AWS_ACCESS_KEY_ID and
// AWS_REGION environment variables.
credentials := aws.NewConfig().WithCredentials(credentials.NewEnvCredentials())
store := s3store.New(Flags.S3Bucket, s3.New(session.New(), credentials))
store.UseIn(Composer)
locker := memorylocker.New()
locker.UseIn(Composer)
}
storeSize := Flags.StoreSize
maxSize := Flags.MaxSize
if storeSize > 0 {
limitedstore.New(storeSize, Composer.Core, Composer.Terminater).UseIn(Composer)
stdout.Printf("Using %.2fMB as storage size.\n", float64(storeSize)/1024/1024)
// We need to ensure that a single upload can fit into the storage size
if maxSize > storeSize || maxSize == 0 {
Flags.MaxSize = storeSize
}
}
stdout.Printf("Using %.2fMB as maximum size.\n", float64(Flags.MaxSize)/1024/1024)
}

54
cmd/tusd/cli/flags.go Normal file
View File

@ -0,0 +1,54 @@
package cli
import (
"flag"
"path/filepath"
)
var Flags struct {
HttpHost string
HttpPort string
MaxSize int64
UploadDir string
StoreSize int64
Basepath string
Timeout int64
S3Bucket string
HooksDir string
ShowVersion bool
ExposeMetrics bool
BehindProxy bool
HooksInstalled bool
}
func ParseFlags() {
flag.StringVar(&Flags.HttpHost, "host", "0.0.0.0", "Host to bind HTTP server to")
flag.StringVar(&Flags.HttpPort, "port", "1080", "Port to bind HTTP server to")
flag.Int64Var(&Flags.MaxSize, "max-size", 0, "Maximum size of a single upload in bytes")
flag.StringVar(&Flags.UploadDir, "dir", "./data", "Directory to store uploads in")
flag.Int64Var(&Flags.StoreSize, "store-size", 0, "Size of space allowed for storage")
flag.StringVar(&Flags.Basepath, "base-path", "/files/", "Basepath of the HTTP server")
flag.Int64Var(&Flags.Timeout, "timeout", 30*1000, "Read timeout for connections in milliseconds. A zero value means that reads will not timeout")
flag.StringVar(&Flags.S3Bucket, "s3-bucket", "", "Use AWS S3 with this bucket as storage backend (requires the AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_REGION environment variables to be set)")
flag.StringVar(&Flags.HooksDir, "hooks-dir", "", "Directory to search for available hooks scripts")
flag.BoolVar(&Flags.ShowVersion, "version", false, "Print tusd version information")
flag.BoolVar(&Flags.ExposeMetrics, "expose-metrics", true, "Expose metrics about tusd usage")
flag.BoolVar(&Flags.BehindProxy, "behind-proxy", false, "Respect X-Forwarded-* and similar headers which may be set by proxies")
flag.Parse()
if Flags.HooksDir != "" {
Flags.HooksDir, _ = filepath.Abs(Flags.HooksDir)
Flags.HooksInstalled = true
stdout.Printf("Using '%s' for hooks", Flags.HooksDir)
}
if Flags.UploadDir == "" && Flags.S3Bucket == "" {
stderr.Fatalf("Either an upload directory (using -dir) or an AWS S3 Bucket " +
"(using -s3-bucket) must be specified to start tusd but " +
"neither flag was provided. Please consult `tusd -help` for " +
"more information on these options.")
}
}

32
cmd/tusd/cli/greeting.go Normal file
View File

@ -0,0 +1,32 @@
package cli
import (
"fmt"
"net/http"
)
var greeting string
func PrepareGreeting() {
greeting = fmt.Sprintf(
`Welcome to tusd
===============
Congratulations for setting up tusd! You are now part of the chosen elite and
able to experience the feeling of resumable uploads! We hope you are as excited
as we are (a lot)!
However, there is something you should be aware of: While you got tusd
running (you did an awesome job!), this is the root directory of the server
and tus requests are only accepted at the %s route.
So don't waste time, head over there and experience the future!
Version = %s
GitCommit = %s
BuildDate = %s`, Flags.Basepath, VersionName, GitCommit, BuildDate)
}
func DisplayGreeting(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(greeting))
}

72
cmd/tusd/cli/hooks.go Normal file
View File

@ -0,0 +1,72 @@
package cli
import (
"bytes"
"encoding/json"
"os"
"os/exec"
"strconv"
"github.com/tus/tusd"
)
type HookType string
const (
HookPostFinish HookType = "post-finish"
HookPostTerminate HookType = "post-terminate"
)
func SetupHooks(handler *tusd.Handler) {
go func() {
for {
select {
case info := <-handler.CompleteUploads:
invokeHook(HookPostFinish, info)
case info := <-handler.TerminatedUploads:
invokeHook(HookPostTerminate, info)
}
}
}()
}
func invokeHook(typ HookType, info tusd.FileInfo) {
switch typ {
case HookPostFinish:
stdout.Printf("Upload %s (%d bytes) finished\n", info.ID, info.Size)
case HookPostTerminate:
stdout.Printf("Upload %s terminated\n", info.ID)
}
if !Flags.HooksInstalled {
return
}
name := string(typ)
stdout.Printf("Invoking %s hook…\n", name)
cmd := exec.Command(Flags.HooksDir + "/" + name)
env := os.Environ()
env = append(env, "TUS_ID="+info.ID)
env = append(env, "TUS_SIZE="+strconv.FormatInt(info.Size, 10))
jsonInfo, err := json.Marshal(info)
if err != nil {
stderr.Printf("Error encoding JSON for hook: %s", err)
}
reader := bytes.NewReader(jsonInfo)
cmd.Stdin = reader
cmd.Env = env
cmd.Dir = Flags.HooksDir
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
go func() {
err := cmd.Run()
if err != nil {
stderr.Printf("Error running %s hook for %s: %s", name, info.ID, err)
}
}()
}

89
cmd/tusd/cli/listener.go Normal file
View File

@ -0,0 +1,89 @@
package cli
import (
"net"
"time"
)
// Listener wraps a net.Listener, and gives a place to store the timeout
// parameters. On Accept, it will wrap the net.Conn with our own Conn for us.
// Original implementation taken from https://gist.github.com/jbardin/9663312
// Thanks! <3
type Listener struct {
net.Listener
ReadTimeout time.Duration
WriteTimeout time.Duration
}
func (l *Listener) Accept() (net.Conn, error) {
c, err := l.Listener.Accept()
if err != nil {
return nil, err
}
go MetricsOpenConnections.Inc()
tc := &Conn{
Conn: c,
ReadTimeout: l.ReadTimeout,
WriteTimeout: l.WriteTimeout,
}
return tc, nil
}
// Conn wraps a net.Conn, and sets a deadline for every read
// and write operation.
type Conn struct {
net.Conn
ReadTimeout time.Duration
WriteTimeout time.Duration
}
func (c *Conn) Read(b []byte) (int, error) {
var err error
if c.ReadTimeout > 0 {
err = c.Conn.SetReadDeadline(time.Now().Add(c.ReadTimeout))
} else {
err = c.Conn.SetReadDeadline(time.Time{})
}
if err != nil {
return 0, err
}
return c.Conn.Read(b)
}
func (c *Conn) Write(b []byte) (int, error) {
var err error
if c.WriteTimeout > 0 {
err = c.Conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout))
} else {
err = c.Conn.SetWriteDeadline(time.Time{})
}
if err != nil {
return 0, err
}
return c.Conn.Write(b)
}
func (c *Conn) Close() error {
go MetricsOpenConnections.Dec()
return c.Conn.Close()
}
func NewListener(addr string, readTimeout, writeTimeout time.Duration) (net.Listener, error) {
l, err := net.Listen("tcp", addr)
if err != nil {
return nil, err
}
tl := &Listener{
Listener: l,
ReadTimeout: readTimeout,
WriteTimeout: writeTimeout,
}
return tl, nil
}

9
cmd/tusd/cli/log.go Normal file
View File

@ -0,0 +1,9 @@
package cli
import (
"log"
"os"
)
var stdout = log.New(os.Stdout, "[tusd] ", 0)
var stderr = log.New(os.Stderr, "[tusd] ", 0)

22
cmd/tusd/cli/metrics.go Normal file
View File

@ -0,0 +1,22 @@
package cli
import (
"net/http"
"github.com/tus/tusd"
"github.com/tus/tusd/prometheuscollector"
"github.com/prometheus/client_golang/prometheus"
)
var MetricsOpenConnections = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tusd_connections_open",
Help: "Current number of open connections.",
})
func SetupMetrics(handler *tusd.Handler) {
prometheus.MustRegister(MetricsOpenConnections)
prometheus.MustRegister(prometheuscollector.New(handler.Metrics))
http.Handle("/metrics", prometheus.Handler())
}

53
cmd/tusd/cli/serve.go Normal file
View File

@ -0,0 +1,53 @@
package cli
import (
"net/http"
"time"
"github.com/tus/tusd"
)
func Serve() {
handler, err := tusd.NewHandler(tusd.Config{
MaxSize: Flags.MaxSize,
BasePath: Flags.Basepath,
RespectForwardedHeaders: Flags.BehindProxy,
StoreComposer: Composer,
NotifyCompleteUploads: true,
NotifyTerminatedUploads: true,
})
if err != nil {
stderr.Fatalf("Unable to create handler: %s", err)
}
address := Flags.HttpHost + ":" + Flags.HttpPort
basepath := Flags.Basepath
stdout.Printf("Using %s as address to listen.\n", address)
stdout.Printf("Using %s as the base path.\n", basepath)
stdout.Printf(Composer.Capabilities())
SetupHooks(handler)
if Flags.ExposeMetrics {
SetupMetrics(handler)
}
// Do not display the greeting if the tusd handler will be mounted at the root
// path. Else this would cause a "multiple registrations for /" panic.
if basepath != "/" {
http.HandleFunc("/", DisplayGreeting)
}
http.Handle(basepath, http.StripPrefix(basepath, handler))
timeoutDuration := time.Duration(Flags.Timeout) * time.Millisecond
listener, err := NewListener(address, timeoutDuration, timeoutDuration)
if err != nil {
stderr.Fatalf("Unable to create listener: %s", err)
}
if err = http.Serve(listener, nil); err != nil {
stderr.Fatalf("Unable to serve: %s", err)
}
}

13
cmd/tusd/cli/version.go Normal file
View File

@ -0,0 +1,13 @@
package cli
import (
"fmt"
)
var VersionName = "n/a"
var GitCommit = "n/a"
var BuildDate = "n/a"
func ShowVersion() {
fmt.Printf("Version: %s\nCommit: %s\nDate: %s\n", VersionName, GitCommit, BuildDate)
}

View File

@ -1,329 +1,19 @@
package main
import (
"bytes"
"encoding/json"
"flag"
"fmt"
"log"
"net"
"net/http"
"os"
"os/exec"
"path/filepath"
"strconv"
"time"
"github.com/tus/tusd"
"github.com/tus/tusd/filestore"
"github.com/tus/tusd/limitedstore"
"github.com/tus/tusd/memorylocker"
"github.com/tus/tusd/prometheuscollector"
"github.com/tus/tusd/s3store"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/prometheus/client_golang/prometheus"
"github.com/tus/tusd/cmd/tusd/cli"
)
var VersionName = "n/a"
var GitCommit = "n/a"
var BuildDate = "n/a"
var httpHost string
var httpPort string
var maxSize int64
var dir string
var storeSize int64
var basepath string
var timeout int64
var s3Bucket string
var hooksDir string
var version bool
var exposeMetrics bool
var behindProxy bool
var stdout = log.New(os.Stdout, "[tusd] ", 0)
var stderr = log.New(os.Stderr, "[tusd] ", 0)
var hookInstalled bool
var greeting string
var openConnections = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "connections_open_total",
Help: "Current number of open connections.",
})
func init() {
flag.StringVar(&httpHost, "host", "0.0.0.0", "Host to bind HTTP server to")
flag.StringVar(&httpPort, "port", "1080", "Port to bind HTTP server to")
flag.Int64Var(&maxSize, "max-size", 0, "Maximum size of a single upload in bytes")
flag.StringVar(&dir, "dir", "./data", "Directory to store uploads in")
flag.Int64Var(&storeSize, "store-size", 0, "Size of space allowed for storage")
flag.StringVar(&basepath, "base-path", "/files/", "Basepath of the HTTP server")
flag.Int64Var(&timeout, "timeout", 30*1000, "Read timeout for connections in milliseconds. A zero value means that reads will not timeout")
flag.StringVar(&s3Bucket, "s3-bucket", "", "Use AWS S3 with this bucket as storage backend (requires the AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_REGION environment variables to be set)")
flag.StringVar(&hooksDir, "hooks-dir", "", "Directory to search for available hooks scripts")
flag.BoolVar(&version, "version", false, "Print tusd version information")
flag.BoolVar(&exposeMetrics, "expose-metrics", true, "Expose metrics about tusd usage")
flag.BoolVar(&behindProxy, "behind-proxy", false, "Respect X-Forwarded-* and similar headers which may be set by proxies")
flag.Parse()
if hooksDir != "" {
hooksDir, _ = filepath.Abs(hooksDir)
hookInstalled = true
stdout.Printf("Using '%s' for hooks", hooksDir)
}
greeting = fmt.Sprintf(
`Welcome to tusd
===============
Congratulations for setting up tusd! You are now part of the chosen elite and
able to experience the feeling of resumable uploads! We hope you are as excited
as we are (a lot)!
However, there is something you should be aware of: While you got tusd
running (you did an awesome job!), this is the root directory of the server
and tus requests are only accepted at the %s route.
So don't waste time, head over there and experience the future!
Version = %s
GitCommit = %s
BuildDate = %s`, basepath, VersionName, GitCommit, BuildDate)
}
func main() {
cli.ParseFlags()
cli.PrepareGreeting()
// Print version and other information and exit if the -version flag has been
// passed.
if version {
fmt.Printf("Version: %s\nCommit: %s\nDate: %s\n", VersionName, GitCommit, BuildDate)
return
}
// Attempt to use S3 as a backend if the -s3-bucket option has been supplied.
// If not, we default to storing them locally on disk.
composer := tusd.NewStoreComposer()
if s3Bucket == "" {
stdout.Printf("Using '%s' as directory storage.\n", dir)
if err := os.MkdirAll(dir, os.FileMode(0775)); err != nil {
stderr.Fatalf("Unable to ensure directory exists: %s", err)
}
store := filestore.New(dir)
store.UseIn(composer)
// passed else we will start the HTTP server
if cli.Flags.ShowVersion {
cli.ShowVersion()
} else {
stdout.Printf("Using 's3://%s' as S3 bucket for storage.\n", s3Bucket)
// Derive credentials from AWS_SECRET_ACCESS_KEY, AWS_ACCESS_KEY_ID and
// AWS_REGION environment variables.
credentials := aws.NewConfig().WithCredentials(credentials.NewEnvCredentials())
store := s3store.New(s3Bucket, s3.New(session.New(), credentials))
store.UseIn(composer)
locker := memorylocker.New()
locker.UseIn(composer)
}
if storeSize > 0 {
limitedstore.New(storeSize, composer.Core, composer.Terminater).UseIn(composer)
stdout.Printf("Using %.2fMB as storage size.\n", float64(storeSize)/1024/1024)
// We need to ensure that a single upload can fit into the storage size
if maxSize > storeSize || maxSize == 0 {
maxSize = storeSize
}
}
stdout.Printf("Using %.2fMB as maximum size.\n", float64(maxSize)/1024/1024)
handler, err := tusd.NewHandler(tusd.Config{
MaxSize: maxSize,
BasePath: basepath,
StoreComposer: composer,
RespectForwardedHeaders: behindProxy,
NotifyCompleteUploads: true,
NotifyTerminatedUploads: true,
})
if err != nil {
stderr.Fatalf("Unable to create handler: %s", err)
}
address := httpHost + ":" + httpPort
stdout.Printf("Using %s as address to listen.\n", address)
stdout.Printf("Using %s as the base path.\n", basepath)
stdout.Printf(composer.Capabilities())
go func() {
for {
select {
case info := <-handler.CompleteUploads:
invokeHook("post-finish", info)
case info := <-handler.TerminatedUploads:
invokeHook("post-terminate", info)
}
}
}()
if exposeMetrics {
prometheus.MustRegister(openConnections)
prometheus.MustRegister(prometheuscollector.New(handler.Metrics))
http.Handle("/metrics", prometheus.Handler())
}
// Do not display the greeting if the tusd handler will be mounted at the root
// path. Else this would cause a "multiple registrations for /" panic.
if basepath != "/" {
http.HandleFunc("/", displayGreeting)
}
http.Handle(basepath, http.StripPrefix(basepath, handler))
timeoutDuration := time.Duration(timeout) * time.Millisecond
listener, err := NewListener(address, timeoutDuration, timeoutDuration)
if err != nil {
stderr.Fatalf("Unable to create listener: %s", err)
}
if err = http.Serve(listener, nil); err != nil {
stderr.Fatalf("Unable to serve: %s", err)
cli.CreateComposer()
cli.Serve()
}
}
func invokeHook(name string, info tusd.FileInfo) {
switch name {
case "post-finish":
stdout.Printf("Upload %s (%d bytes) finished\n", info.ID, info.Size)
case "post-terminate":
stdout.Printf("Upload %s terminated\n", info.ID)
}
if !hookInstalled {
return
}
stdout.Printf("Invoking %s hook…\n", name)
cmd := exec.Command(hooksDir + "/" + name)
env := os.Environ()
env = append(env, "TUS_ID="+info.ID)
env = append(env, "TUS_SIZE="+strconv.FormatInt(info.Size, 10))
jsonInfo, err := json.Marshal(info)
if err != nil {
stderr.Printf("Error encoding JSON for hook: %s", err)
}
reader := bytes.NewReader(jsonInfo)
cmd.Stdin = reader
cmd.Env = env
cmd.Dir = hooksDir
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
go func() {
err := cmd.Run()
if err != nil {
stderr.Printf("Error running %s hook for %s: %s", name, info.ID, err)
}
}()
}
func displayGreeting(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(greeting))
}
// Listener wraps a net.Listener, and gives a place to store the timeout
// parameters. On Accept, it will wrap the net.Conn with our own Conn for us.
// Original implementation taken from https://gist.github.com/jbardin/9663312
// Thanks! <3
type Listener struct {
net.Listener
ReadTimeout time.Duration
WriteTimeout time.Duration
}
func (l *Listener) Accept() (net.Conn, error) {
c, err := l.Listener.Accept()
if err != nil {
return nil, err
}
go openConnections.Inc()
tc := &Conn{
Conn: c,
ReadTimeout: l.ReadTimeout,
WriteTimeout: l.WriteTimeout,
}
return tc, nil
}
// Conn wraps a net.Conn, and sets a deadline for every read
// and write operation.
type Conn struct {
net.Conn
ReadTimeout time.Duration
WriteTimeout time.Duration
}
func (c *Conn) Read(b []byte) (int, error) {
var err error
if c.ReadTimeout > 0 {
err = c.Conn.SetReadDeadline(time.Now().Add(c.ReadTimeout))
} else {
err = c.Conn.SetReadDeadline(time.Time{})
}
if err != nil {
return 0, err
}
return c.Conn.Read(b)
}
func (c *Conn) Write(b []byte) (int, error) {
var err error
if c.WriteTimeout > 0 {
err = c.Conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout))
} else {
err = c.Conn.SetWriteDeadline(time.Time{})
}
if err != nil {
return 0, err
}
return c.Conn.Write(b)
}
func (c *Conn) Close() error {
go openConnections.Dec()
return c.Conn.Close()
}
func NewListener(addr string, readTimeout, writeTimeout time.Duration) (net.Listener, error) {
l, err := net.Listen("tcp", addr)
if err != nil {
return nil, err
}
tl := &Listener{
Listener: l,
ReadTimeout: readTimeout,
WriteTimeout: writeTimeout,
}
return tl, nil
}