cli: Add option for run hooks from Go plugin
Squashed commit of the following: commit 1b80f51f94cf860ba8516baed4b65e9ded6441fe Author: Marius <maerious@gmail.com> Date: Mon Jun 10 11:41:30 2019 +0200 Minor improvements commit 98daad5f9fa55895a7ae6397b5fcaa353e240954 Author: Marius <maerious@gmail.com> Date: Fri Jun 7 13:26:14 2019 +0200 Extract File and Http hooks into own structs
This commit is contained in:
parent
10e0bb1fb9
commit
05d9a0ba98
|
@ -25,6 +25,7 @@ var Flags struct {
|
|||
HttpHooksRetry int
|
||||
HttpHooksBackoff int
|
||||
HooksStopUploadCode int
|
||||
PluginHookPath string
|
||||
ShowVersion bool
|
||||
ExposeMetrics bool
|
||||
MetricsPath string
|
||||
|
@ -53,6 +54,7 @@ func ParseFlags() {
|
|||
flag.IntVar(&Flags.HttpHooksRetry, "hooks-http-retry", 3, "Number of times to retry on a 500 or network timeout")
|
||||
flag.IntVar(&Flags.HttpHooksBackoff, "hooks-http-backoff", 1, "Number of seconds to wait before retrying each retry")
|
||||
flag.IntVar(&Flags.HooksStopUploadCode, "hooks-stop-code", 0, "Return code from post-receive hook which causes tusd to stop and delete the current upload. A zero value means that no uploads will be stopped")
|
||||
flag.StringVar(&Flags.PluginHookPath, "hooks-plugin", "", "Path to a Go plugin for loading hook functions (only supported on Linux and macOS; highly EXPERIMENTAL and may BREAK in the future)")
|
||||
flag.BoolVar(&Flags.ShowVersion, "version", false, "Print tusd version information")
|
||||
flag.BoolVar(&Flags.ExposeMetrics, "expose-metrics", true, "Expose metrics about tusd usage")
|
||||
flag.StringVar(&Flags.MetricsPath, "metrics-path", "/metrics", "Path under which the metrics endpoint will be accessible")
|
||||
|
|
|
@ -1,54 +1,27 @@
|
|||
package cli
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/exec"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/tus/tusd"
|
||||
|
||||
"github.com/sethgrid/pester"
|
||||
"github.com/tus/tusd/cmd/tusd/cli/hooks"
|
||||
)
|
||||
|
||||
type HookType string
|
||||
|
||||
const (
|
||||
HookPostFinish HookType = "post-finish"
|
||||
HookPostTerminate HookType = "post-terminate"
|
||||
HookPostReceive HookType = "post-receive"
|
||||
HookPostCreate HookType = "post-create"
|
||||
HookPreCreate HookType = "pre-create"
|
||||
)
|
||||
var hookHandler hooks.HookHandler = nil
|
||||
|
||||
type hookDataStore struct {
|
||||
tusd.DataStore
|
||||
}
|
||||
|
||||
type hookError struct {
|
||||
error
|
||||
statusCode int
|
||||
body []byte
|
||||
}
|
||||
|
||||
func (herr hookError) StatusCode() int {
|
||||
return herr.statusCode
|
||||
}
|
||||
|
||||
func (herr hookError) Body() []byte {
|
||||
return herr.body
|
||||
}
|
||||
|
||||
func (store hookDataStore) NewUpload(info tusd.FileInfo) (id string, err error) {
|
||||
if output, err := invokeHookSync(HookPreCreate, info, true); err != nil {
|
||||
if hookErr, ok := err.(hookError); ok {
|
||||
hookErr.error = fmt.Errorf("pre-create hook failed: %s", err)
|
||||
return "", hookErr
|
||||
if output, err := invokeHookSync(hooks.HookPreCreate, info, true); err != nil {
|
||||
if hookErr, ok := err.(hooks.HookError); ok {
|
||||
return "", hooks.NewHookError(
|
||||
fmt.Errorf("pre-create hook failed: %s", err),
|
||||
hookErr.StatusCode(),
|
||||
hookErr.Body(),
|
||||
)
|
||||
}
|
||||
return "", fmt.Errorf("pre-create hook failed: %s\n%s", err, string(output))
|
||||
}
|
||||
|
@ -56,17 +29,41 @@ func (store hookDataStore) NewUpload(info tusd.FileInfo) (id string, err error)
|
|||
}
|
||||
|
||||
func SetupHookMetrics() {
|
||||
MetricsHookErrorsTotal.WithLabelValues(string(HookPostFinish)).Add(0)
|
||||
MetricsHookErrorsTotal.WithLabelValues(string(HookPostTerminate)).Add(0)
|
||||
MetricsHookErrorsTotal.WithLabelValues(string(HookPostReceive)).Add(0)
|
||||
MetricsHookErrorsTotal.WithLabelValues(string(HookPostCreate)).Add(0)
|
||||
MetricsHookErrorsTotal.WithLabelValues(string(HookPreCreate)).Add(0)
|
||||
MetricsHookErrorsTotal.WithLabelValues(string(hooks.HookPostFinish)).Add(0)
|
||||
MetricsHookErrorsTotal.WithLabelValues(string(hooks.HookPostTerminate)).Add(0)
|
||||
MetricsHookErrorsTotal.WithLabelValues(string(hooks.HookPostReceive)).Add(0)
|
||||
MetricsHookErrorsTotal.WithLabelValues(string(hooks.HookPostCreate)).Add(0)
|
||||
MetricsHookErrorsTotal.WithLabelValues(string(hooks.HookPreCreate)).Add(0)
|
||||
}
|
||||
|
||||
func SetupPreHooks(composer *tusd.StoreComposer) error {
|
||||
if Flags.FileHooksDir != "" {
|
||||
hookHandler = &hooks.FileHook{
|
||||
Directory: Flags.FileHooksDir,
|
||||
}
|
||||
} else if Flags.HttpHooksEndpoint != "" {
|
||||
hookHandler = &hooks.HttpHook{
|
||||
Endpoint: Flags.HttpHooksEndpoint,
|
||||
MaxRetries: Flags.HttpHooksRetry,
|
||||
Backoff: Flags.HttpHooksBackoff,
|
||||
}
|
||||
} else if Flags.PluginHookPath != "" {
|
||||
hookHandler = &hooks.PluginHook{
|
||||
Path: Flags.PluginHookPath,
|
||||
}
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := hookHandler.Setup(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
func SetupPreHooks(composer *tusd.StoreComposer) {
|
||||
composer.UseCore(hookDataStore{
|
||||
DataStore: composer.Core,
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func SetupPostHooks(handler *tusd.Handler) {
|
||||
|
@ -74,50 +71,41 @@ func SetupPostHooks(handler *tusd.Handler) {
|
|||
for {
|
||||
select {
|
||||
case info := <-handler.CompleteUploads:
|
||||
invokeHook(HookPostFinish, info)
|
||||
invokeHookAsync(hooks.HookPostFinish, info)
|
||||
case info := <-handler.TerminatedUploads:
|
||||
invokeHook(HookPostTerminate, info)
|
||||
invokeHookAsync(hooks.HookPostTerminate, info)
|
||||
case info := <-handler.UploadProgress:
|
||||
invokeHook(HookPostReceive, info)
|
||||
invokeHookAsync(hooks.HookPostReceive, info)
|
||||
case info := <-handler.CreatedUploads:
|
||||
invokeHook(HookPostCreate, info)
|
||||
invokeHookAsync(hooks.HookPostCreate, info)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func invokeHook(typ HookType, info tusd.FileInfo) {
|
||||
func invokeHookAsync(typ hooks.HookType, info tusd.FileInfo) {
|
||||
go func() {
|
||||
// Error handling is taken care by the function.
|
||||
_, _ = invokeHookSync(typ, info, false)
|
||||
}()
|
||||
}
|
||||
|
||||
func invokeHookSync(typ HookType, info tusd.FileInfo, captureOutput bool) ([]byte, error) {
|
||||
func invokeHookSync(typ hooks.HookType, info tusd.FileInfo, captureOutput bool) ([]byte, error) {
|
||||
switch typ {
|
||||
case HookPostFinish:
|
||||
case hooks.HookPostFinish:
|
||||
logEv(stdout, "UploadFinished", "id", info.ID, "size", strconv.FormatInt(info.Size, 10))
|
||||
case HookPostTerminate:
|
||||
case hooks.HookPostTerminate:
|
||||
logEv(stdout, "UploadTerminated", "id", info.ID)
|
||||
}
|
||||
|
||||
if !Flags.FileHooksInstalled && !Flags.HttpHooksInstalled {
|
||||
if hookHandler == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
name := string(typ)
|
||||
logEv(stdout, "HookInvocationStart", "type", name, "id", info.ID)
|
||||
|
||||
output := []byte{}
|
||||
err := error(nil)
|
||||
returnCode := 0
|
||||
|
||||
if Flags.FileHooksInstalled {
|
||||
output, returnCode, err = invokeFileHook(name, typ, info, captureOutput)
|
||||
}
|
||||
|
||||
if Flags.HttpHooksInstalled {
|
||||
output, returnCode, err = invokeHttpHook(name, typ, info, captureOutput)
|
||||
}
|
||||
output, returnCode, err := hookHandler.InvokeHook(typ, info, captureOutput)
|
||||
|
||||
if err != nil {
|
||||
logEv(stderr, "HookInvocationError", "type", string(typ), "id", info.ID, "error", err.Error())
|
||||
|
@ -126,7 +114,7 @@ func invokeHookSync(typ HookType, info tusd.FileInfo, captureOutput bool) ([]byt
|
|||
logEv(stdout, "HookInvocationFinish", "type", string(typ), "id", info.ID)
|
||||
}
|
||||
|
||||
if typ == HookPostReceive && Flags.HooksStopUploadCode != 0 && Flags.HooksStopUploadCode == returnCode {
|
||||
if typ == hooks.HookPostReceive && Flags.HooksStopUploadCode != 0 && Flags.HooksStopUploadCode == returnCode {
|
||||
logEv(stdout, "HookStopUpload", "id", info.ID)
|
||||
|
||||
info.StopUpload()
|
||||
|
@ -134,88 +122,3 @@ func invokeHookSync(typ HookType, info tusd.FileInfo, captureOutput bool) ([]byt
|
|||
|
||||
return output, err
|
||||
}
|
||||
|
||||
func invokeHttpHook(name string, typ HookType, info tusd.FileInfo, captureOutput bool) ([]byte, int, error) {
|
||||
jsonInfo, err := json.Marshal(info)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("POST", Flags.HttpHooksEndpoint, bytes.NewBuffer(jsonInfo))
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
req.Header.Set("Hook-Name", name)
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
// Use linear backoff strategy with the user defined values.
|
||||
client := pester.New()
|
||||
client.KeepLog = true
|
||||
client.MaxRetries = Flags.HttpHooksRetry
|
||||
client.Backoff = func(_ int) time.Duration {
|
||||
return time.Duration(Flags.HttpHooksBackoff) * time.Second
|
||||
}
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
if resp.StatusCode >= http.StatusBadRequest {
|
||||
return body, resp.StatusCode, hookError{fmt.Errorf("endpoint returned: %s", resp.Status), resp.StatusCode, body}
|
||||
}
|
||||
|
||||
if captureOutput {
|
||||
return body, resp.StatusCode, err
|
||||
}
|
||||
|
||||
return nil, resp.StatusCode, err
|
||||
}
|
||||
|
||||
func invokeFileHook(name string, typ HookType, info tusd.FileInfo, captureOutput bool) ([]byte, int, error) {
|
||||
hookPath := Flags.FileHooksDir + string(os.PathSeparator) + name
|
||||
cmd := exec.Command(hookPath)
|
||||
env := os.Environ()
|
||||
env = append(env, "TUS_ID="+info.ID)
|
||||
env = append(env, "TUS_SIZE="+strconv.FormatInt(info.Size, 10))
|
||||
env = append(env, "TUS_OFFSET="+strconv.FormatInt(info.Offset, 10))
|
||||
|
||||
jsonInfo, err := json.Marshal(info)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
reader := bytes.NewReader(jsonInfo)
|
||||
cmd.Stdin = reader
|
||||
|
||||
cmd.Env = env
|
||||
cmd.Dir = Flags.FileHooksDir
|
||||
cmd.Stderr = os.Stderr
|
||||
|
||||
// If `captureOutput` is true, this function will return the output (both,
|
||||
// stderr and stdout), else it will use this process' stdout
|
||||
var output []byte
|
||||
if !captureOutput {
|
||||
cmd.Stdout = os.Stdout
|
||||
err = cmd.Run()
|
||||
} else {
|
||||
output, err = cmd.Output()
|
||||
}
|
||||
|
||||
// Ignore the error, only, if the hook's file could not be found. This usually
|
||||
// means that the user is only using a subset of the available hooks.
|
||||
if os.IsNotExist(err) {
|
||||
err = nil
|
||||
}
|
||||
|
||||
returnCode := cmd.ProcessState.ExitCode()
|
||||
|
||||
return output, returnCode, err
|
||||
}
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
package hooks
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"os"
|
||||
"os/exec"
|
||||
"strconv"
|
||||
|
||||
"github.com/tus/tusd"
|
||||
)
|
||||
|
||||
type FileHook struct {
|
||||
Directory string
|
||||
}
|
||||
|
||||
func (_ FileHook) Setup() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h FileHook) InvokeHook(typ HookType, info tusd.FileInfo, captureOutput bool) ([]byte, int, error) {
|
||||
hookPath := h.Directory + string(os.PathSeparator) + string(typ)
|
||||
cmd := exec.Command(hookPath)
|
||||
env := os.Environ()
|
||||
env = append(env, "TUS_ID="+info.ID)
|
||||
env = append(env, "TUS_SIZE="+strconv.FormatInt(info.Size, 10))
|
||||
env = append(env, "TUS_OFFSET="+strconv.FormatInt(info.Offset, 10))
|
||||
|
||||
jsonInfo, err := json.Marshal(info)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
reader := bytes.NewReader(jsonInfo)
|
||||
cmd.Stdin = reader
|
||||
|
||||
cmd.Env = env
|
||||
cmd.Dir = h.Directory
|
||||
cmd.Stderr = os.Stderr
|
||||
|
||||
// If `captureOutput` is true, this function will return the output (both,
|
||||
// stderr and stdout), else it will use this process' stdout
|
||||
var output []byte
|
||||
if !captureOutput {
|
||||
cmd.Stdout = os.Stdout
|
||||
err = cmd.Run()
|
||||
} else {
|
||||
output, err = cmd.Output()
|
||||
}
|
||||
|
||||
// Ignore the error, only, if the hook's file could not be found. This usually
|
||||
// means that the user is only using a subset of the available hooks.
|
||||
if os.IsNotExist(err) {
|
||||
err = nil
|
||||
}
|
||||
|
||||
returnCode := cmd.ProcessState.ExitCode()
|
||||
|
||||
return output, returnCode, err
|
||||
}
|
|
@ -0,0 +1,46 @@
|
|||
package hooks
|
||||
|
||||
import (
|
||||
"github.com/tus/tusd"
|
||||
)
|
||||
|
||||
type HookHandler interface {
|
||||
Setup() error
|
||||
InvokeHook(typ HookType, info tusd.FileInfo, captureOutput bool) ([]byte, int, error)
|
||||
}
|
||||
|
||||
type HookType string
|
||||
|
||||
const (
|
||||
HookPostFinish HookType = "post-finish"
|
||||
HookPostTerminate HookType = "post-terminate"
|
||||
HookPostReceive HookType = "post-receive"
|
||||
HookPostCreate HookType = "post-create"
|
||||
HookPreCreate HookType = "pre-create"
|
||||
)
|
||||
|
||||
type hookDataStore struct {
|
||||
tusd.DataStore
|
||||
}
|
||||
|
||||
type HookError struct {
|
||||
error
|
||||
statusCode int
|
||||
body []byte
|
||||
}
|
||||
|
||||
func NewHookError(err error, statusCode int, body []byte) HookError {
|
||||
return HookError{err, statusCode, body}
|
||||
}
|
||||
|
||||
func (herr HookError) StatusCode() int {
|
||||
return herr.statusCode
|
||||
}
|
||||
|
||||
func (herr HookError) Body() []byte {
|
||||
return herr.body
|
||||
}
|
||||
|
||||
func (herr HookError) Error() string {
|
||||
return herr.error.Error()
|
||||
}
|
|
@ -0,0 +1,69 @@
|
|||
package hooks
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/tus/tusd"
|
||||
|
||||
"github.com/sethgrid/pester"
|
||||
)
|
||||
|
||||
type HttpHook struct {
|
||||
Endpoint string
|
||||
MaxRetries int
|
||||
Backoff int
|
||||
}
|
||||
|
||||
func (_ HttpHook) Setup() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h HttpHook) InvokeHook(typ HookType, info tusd.FileInfo, captureOutput bool) ([]byte, int, error) {
|
||||
jsonInfo, err := json.Marshal(info)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("POST", h.Endpoint, bytes.NewBuffer(jsonInfo))
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
req.Header.Set("Hook-Name", string(typ))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
// TODO: Can we initialize this in Setup()?
|
||||
// Use linear backoff strategy with the user defined values.
|
||||
client := pester.New()
|
||||
client.KeepLog = true
|
||||
client.MaxRetries = h.MaxRetries
|
||||
client.Backoff = func(_ int) time.Duration {
|
||||
return time.Duration(h.Backoff) * time.Second
|
||||
}
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
if resp.StatusCode >= http.StatusBadRequest {
|
||||
return body, resp.StatusCode, NewHookError(fmt.Errorf("endpoint returned: %s", resp.Status), resp.StatusCode, body)
|
||||
}
|
||||
|
||||
if captureOutput {
|
||||
return body, resp.StatusCode, err
|
||||
}
|
||||
|
||||
return nil, resp.StatusCode, err
|
||||
}
|
|
@ -0,0 +1,66 @@
|
|||
package hooks
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"plugin"
|
||||
|
||||
"github.com/tus/tusd"
|
||||
)
|
||||
|
||||
type PluginHookHandler interface {
|
||||
PreCreate(info tusd.FileInfo) error
|
||||
PostCreate(info tusd.FileInfo) error
|
||||
PostReceive(info tusd.FileInfo) error
|
||||
PostFinish(info tusd.FileInfo) error
|
||||
PostTerminate(info tusd.FileInfo) error
|
||||
}
|
||||
|
||||
type PluginHook struct {
|
||||
Path string
|
||||
|
||||
handler PluginHookHandler
|
||||
}
|
||||
|
||||
func (h *PluginHook) Setup() error {
|
||||
p, err := plugin.Open(h.Path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
symbol, err := p.Lookup("TusdHookHandler")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
handler, ok := symbol.(*PluginHookHandler)
|
||||
if !ok {
|
||||
return fmt.Errorf("hooks: could not cast TusdHookHandler from %s into PluginHookHandler interface", h.Path)
|
||||
}
|
||||
|
||||
h.handler = *handler
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h PluginHook) InvokeHook(typ HookType, info tusd.FileInfo, captureOutput bool) ([]byte, int, error) {
|
||||
var err error
|
||||
switch typ {
|
||||
case HookPostFinish:
|
||||
err = h.handler.PostFinish(info)
|
||||
case HookPostTerminate:
|
||||
err = h.handler.PostTerminate(info)
|
||||
case HookPostReceive:
|
||||
err = h.handler.PostReceive(info)
|
||||
case HookPostCreate:
|
||||
err = h.handler.PostCreate(info)
|
||||
case HookPreCreate:
|
||||
err = h.handler.PreCreate(info)
|
||||
default:
|
||||
err = fmt.Errorf("hooks: unknown hook named %s", typ)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, 1, err
|
||||
}
|
||||
|
||||
return nil, 0, nil
|
||||
}
|
|
@ -7,8 +7,8 @@ import (
|
|||
"github.com/tus/tusd"
|
||||
)
|
||||
|
||||
var stdout = log.New(os.Stdout, "[tusd] ", 0)
|
||||
var stderr = log.New(os.Stderr, "[tusd] ", 0)
|
||||
var stdout = log.New(os.Stdout, "[tusd] ", log.Ldate|log.Ltime)
|
||||
var stderr = log.New(os.Stderr, "[tusd] ", log.Ldate|log.Ltime)
|
||||
|
||||
func logEv(logOutput *log.Logger, eventName string, details ...string) {
|
||||
tusd.LogEvent(logOutput, eventName, details...)
|
||||
|
|
|
@ -15,7 +15,9 @@ import (
|
|||
// specified, in which case a different socket creation and binding mechanism
|
||||
// is put in place.
|
||||
func Serve() {
|
||||
SetupPreHooks(Composer)
|
||||
if err := SetupPreHooks(Composer); err != nil {
|
||||
stderr.Fatalf("Unable to setup hooks for handler: %s", err)
|
||||
}
|
||||
|
||||
handler, err := tusd.NewHandler(tusd.Config{
|
||||
MaxSize: Flags.MaxSize,
|
||||
|
|
Loading…
Reference in New Issue