From 9af87d50ceb3ab0e79d1be40b59279c8f90a33ca Mon Sep 17 00:00:00 2001 From: Anders Chen Date: Sat, 10 Nov 2018 21:05:44 +0100 Subject: [PATCH] Add etcd3 locker (#202) * First pass at etcd3 locker * Add etcd packages to vendor ignore attribute * Properly exclude etcd3locker from earlier versions of golang * Etcd test to only run on go1.9 and go1.10 * Do not export internal etcd3locker methods * context.TODO -> context.Background * Return errors instead of logging * Enforce a 1.5s timeout on acquiring a lease from etcd * Allow etcd3 concurreny.NewSession manage KeepAlive connection - Introduce LockerOptions to allow a custom session TTL (default: 60s); etcd3Locker can be initialized with NewWithLockerOptions if one wants granular control over TTL and prefix used for etcd3 keys - Keep NewWithPrefix backwards compatible by calling NewWithLockerOptions * Add comment on locker_test.go regarding testing the session KeepAlive * Re-export main type * Try to address missing github.com/gogo/protobuf/gogoproto package * Update etcd package import to use go.etcd.io/etcd/clientv3 * Use forked go-etcd-harness for testing * Add more extensive package overview / docs * go fmt * Fix test script * Add downloaded etcd binary to path --- .scripts/test_all.sh | 24 +++++- etcd3locker/lock.go | 47 +++++++++++ etcd3locker/locker.go | 145 ++++++++++++++++++++++++++++++++++ etcd3locker/locker_options.go | 58 ++++++++++++++ etcd3locker/locker_test.go | 59 ++++++++++++++ vendor/vendor.json | 2 +- 6 files changed, 333 insertions(+), 2 deletions(-) create mode 100644 etcd3locker/lock.go create mode 100644 etcd3locker/locker.go create mode 100644 etcd3locker/locker_options.go create mode 100644 etcd3locker/locker_test.go diff --git a/.scripts/test_all.sh b/.scripts/test_all.sh index d61b0ec..652cb14 100755 --- a/.scripts/test_all.sh +++ b/.scripts/test_all.sh @@ -5,7 +5,7 @@ set -e # Find all packages containing Go source code inside the current directory packages=$(find ./ -maxdepth 2 -name '*.go' -printf '%h\n' | sort | uniq) -# The consul package only supports Go1.8+ and therefore we will only run the +# The consul package only supports Go1.10+ and therefore we will only run the # corresponding tests on these versions. goversion=$(go version) if [[ "$goversion" == *"go1.5"* ]] || @@ -26,6 +26,28 @@ else go get -u github.com/hashicorp/consul/... fi +install_etcd_pkgs() { + ETCD_VERSION="3.3.10" + go get -u go.etcd.io/etcd/clientv3 + go get -u github.com/chen-anders/go-etcd-harness + wget -q -O /tmp/etcd.tar.gz "https://github.com/etcd-io/etcd/releases/download/v$ETCD_VERSION/etcd-v$ETCD_VERSION-linux-amd64.tar.gz" + tar xvzf /tmp/etcd.tar.gz -C /tmp + export PATH="$PATH:/tmp/etcd-v$ETCD_VERSION-linux-amd64" +} + +# The etcd 3.3.x package only supports Go1.9+ and therefore +# we will only run the corresponding tests on these versions. +if [[ "$goversion" == *"go1.5"* ]] || + [[ "$goversion" == *"go1.6"* ]] || + [[ "$goversion" == *"go1.7"* ]] || + [[ "$goversion" == *"go1.8"* ]]; then + echo "Skipping tests requiring etcd3locker, which is not supported on $goversion" + packages=$(echo "$packages" | sed '/etcd3locker/d') +else + # Install the etcd packages which are not vendored. + install_etcd_pkgs +fi + # Install the AWS SDK and Prometheus client which is explicitly not vendored go get -u github.com/aws/aws-sdk-go/... go get -u github.com/prometheus/client_golang/prometheus diff --git a/etcd3locker/lock.go b/etcd3locker/lock.go new file mode 100644 index 0000000..1f67238 --- /dev/null +++ b/etcd3locker/lock.go @@ -0,0 +1,47 @@ +// Tested on etcd 3.1+ +package etcd3locker + +import ( + "context" + "time" + + "github.com/tus/tusd" + "go.etcd.io/etcd/clientv3/concurrency" +) + +type etcd3Lock struct { + Id string + Mutex *concurrency.Mutex + Session *concurrency.Session +} + +func newEtcd3Lock(session *concurrency.Session, id string) *etcd3Lock { + return &etcd3Lock{ + Mutex: concurrency.NewMutex(session, id), + Session: session, + } +} + +func (lock *etcd3Lock) Acquire() error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // this is a blocking call; if we receive DeadlineExceeded + // the lock is most likely already taken + if err := lock.Mutex.Lock(ctx); err != nil { + if err == context.DeadlineExceeded { + return tusd.ErrFileLocked + } else { + return err + } + } + return nil +} + +func (lock *etcd3Lock) Release() error { + return lock.Mutex.Unlock(context.Background()) +} + +func (lock *etcd3Lock) CloseSession() error { + return lock.Session.Close() +} diff --git a/etcd3locker/locker.go b/etcd3locker/locker.go new file mode 100644 index 0000000..997efd3 --- /dev/null +++ b/etcd3locker/locker.go @@ -0,0 +1,145 @@ +// Package etcd3locker provides a locking mechanism using an etcd3 cluster +// +// To initialize a locker, a pre-existing connected etcd3 client must be present +// +// client, err := clientv3.New(clientv3.Config{ +// Endpoints: []string{harness.Endpoint}, +// DialTimeout: 5 * time.Second, +// }) +// +// For the most basic locker (e.g. non-shared etcd3 cluster / use default TTLs), +// a locker can be instantiated like the following: +// +// locker, err := etcd3locker.New(client) +// if err != nil { +// return nil, fmt.Errorf("Failed to create etcd locker: %v", err.Error()) +// } +// +// The locker will need to be included in composer that is used by tusd: +// +// composer := tusd.NewStoreComposer() +// locker.UseIn(composer) +// +// For a shared etcd3 cluster, you may want to modify the prefix that etcd3locker uses: +// +// locker, err := etcd3locker.NewWithPrefix(client, "my-prefix") +// if err != nil { +// return nil, fmt.Errorf("Failed to create etcd locker: %v", err.Error()) +// } +// +// +// For full control over all options, an etcd3.LockerOptions may be passed into +// etcd3.NewWithLockerOptions like the following example: +// +// ttl := 15 // seconds +// options := etcd3locker.NewLockerOptions(ttl, "my-prefix") +// locker, err := etcd3locker.NewWithLockerOptions(client, options) +// if err != nil { +// return nil, fmt.Errorf("Failed to create etcd locker: %v", err.Error()) +// } +// +// Tested on etcd 3.1/3.2/3.3 +// +package etcd3locker + +import ( + "errors" + "sync" + "time" + + "github.com/tus/tusd" + etcd3 "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/clientv3/concurrency" +) + +var ( + ErrLockNotHeld = errors.New("Lock not held") + GrantTimeout = 1500 * time.Millisecond +) + +type Etcd3Locker struct { + // etcd3 client session + Client *etcd3.Client + + // locks is used for storing Etcd3Locks before they are + // unlocked. If you want to release a lock, you need the same locker + // instance and therefore we need to save them temporarily. + locks map[string]*etcd3Lock + mutex sync.Mutex + prefix string + sessionTimeout int +} + +// New constructs a new locker using the provided client. +func New(client *etcd3.Client) (*Etcd3Locker, error) { + return NewWithLockerOptions(client, DefaultLockerOptions()) +} + +// This method may be used if a different prefix is required for multi-tenant etcd clusters +func NewWithPrefix(client *etcd3.Client, prefix string) (*Etcd3Locker, error) { + lockerOptions := DefaultLockerOptions() + lockerOptions.SetPrefix(prefix) + return NewWithLockerOptions(client, lockerOptions) +} + +// This method may be used if we want control over both prefix/session TTLs. This is used for testing in particular. +func NewWithLockerOptions(client *etcd3.Client, opts LockerOptions) (*Etcd3Locker, error) { + locksMap := map[string]*etcd3Lock{} + return &Etcd3Locker{Client: client, prefix: opts.Prefix(), sessionTimeout: opts.Timeout(), locks: locksMap, mutex: sync.Mutex{}}, nil +} + +// UseIn adds this locker to the passed composer. +func (locker *Etcd3Locker) UseIn(composer *tusd.StoreComposer) { + composer.UseLocker(locker) +} + +// LockUpload tries to obtain the exclusive lock. +func (locker *Etcd3Locker) LockUpload(id string) error { + session, err := locker.createSession() + if err != nil { + return err + } + + lock := newEtcd3Lock(session, locker.getId(id)) + + err = lock.Acquire() + if err != nil { + return err + } + + locker.mutex.Lock() + defer locker.mutex.Unlock() + // Only add the lock to our list if the acquire was successful and no error appeared. + locker.locks[locker.getId(id)] = lock + + return nil +} + +// UnlockUpload releases a lock. If no such lock exists, no error will be returned. +func (locker *Etcd3Locker) UnlockUpload(id string) error { + locker.mutex.Lock() + defer locker.mutex.Unlock() + + // Complain if no lock has been found. This can only happen if LockUpload + // has not been invoked before or UnlockUpload multiple times. + lock, ok := locker.locks[locker.getId(id)] + if !ok { + return ErrLockNotHeld + } + + err := lock.Release() + if err != nil { + return err + } + + defer delete(locker.locks, locker.getId(id)) + return lock.CloseSession() +} + +func (locker *Etcd3Locker) createSession() (*concurrency.Session, error) { + return concurrency.NewSession(locker.Client, concurrency.WithTTL(locker.sessionTimeout)) +} + +func (locker *Etcd3Locker) getId(id string) string { + return locker.prefix + id +} diff --git a/etcd3locker/locker_options.go b/etcd3locker/locker_options.go new file mode 100644 index 0000000..6852fa7 --- /dev/null +++ b/etcd3locker/locker_options.go @@ -0,0 +1,58 @@ +package etcd3locker + +import ( + "strings" +) + +var ( + DefaultTtl = 60 + DefaultPrefix = "/tusd" +) + +type LockerOptions struct { + timeoutSeconds int + prefix string +} + +func DefaultLockerOptions() LockerOptions { + return LockerOptions{ + timeoutSeconds: 60, + prefix: "/tusd", + } +} + +func NewLockerOptions(timeout int, prefix string) LockerOptions { + return LockerOptions{ + timeoutSeconds: timeout, + prefix: prefix, + } +} + +func (l *LockerOptions) Timeout() int { + if l.timeoutSeconds == 0 { + return DefaultTtl + } else { + return l.timeoutSeconds + } +} + +func (l *LockerOptions) Prefix() string { + prefix := l.prefix + if !strings.HasPrefix(prefix, "/") { + prefix = "/" + prefix + } + + if prefix == "" { + return DefaultPrefix + } else { + return prefix + } +} + +func (l *LockerOptions) SetTimeout(timeout int) { + l.timeoutSeconds = timeout +} + +func (l *LockerOptions) SetPrefix(prefix string) { + l.prefix = prefix +} diff --git a/etcd3locker/locker_test.go b/etcd3locker/locker_test.go new file mode 100644 index 0000000..912554b --- /dev/null +++ b/etcd3locker/locker_test.go @@ -0,0 +1,59 @@ +package etcd3locker + +import ( + etcd_harness "github.com/chen-anders/go-etcd-harness" + "go.etcd.io/etcd/clientv3" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/tus/tusd" +) + +func TestEtcd3Locker(t *testing.T) { + a := assert.New(t) + + harness, err := etcd_harness.New(os.Stderr) + if err != nil { + t.Fatalf("failed starting etcd harness: %v", err) + } + t.Logf("will use etcd harness endpoint: %v", harness.Endpoint) + defer func() { + harness.Stop() + t.Logf("cleaned up etcd harness") + }() + + client, err := clientv3.New(clientv3.Config{ + Endpoints: []string{harness.Endpoint}, + DialTimeout: 5 * time.Second, + }) + if err != nil { + t.Fatalf("Unable to connect to etcd3: %v", err) + } + defer client.Close() + + shortTTL := 3 + testPrefix := "/test-tusd" + + lockerOptions := NewLockerOptions(shortTTL, testPrefix) + locker, err := NewWithLockerOptions(client, lockerOptions) + a.NoError(err) + a.NoError(locker.LockUpload("one")) + a.Equal(tusd.ErrFileLocked, locker.LockUpload("one")) + time.Sleep(5 * time.Second) + // test that we can't take over the upload via a different etcd3 session + // while an upload is already taking place; testing etcd3 session KeepAlive + a.Equal(tusd.ErrFileLocked, locker.LockUpload("one")) + a.NoError(locker.UnlockUpload("one")) + a.Equal(ErrLockNotHeld, locker.UnlockUpload("one")) + + testPrefix = "/test-tusd2" + locker2, err := NewWithPrefix(client, testPrefix) + a.NoError(err) + a.NoError(locker2.LockUpload("one")) + a.Equal(tusd.ErrFileLocked, locker2.LockUpload("one")) + a.Equal(tusd.ErrFileLocked, locker2.LockUpload("one")) + a.NoError(locker2.UnlockUpload("one")) + a.Equal(ErrLockNotHeld, locker2.UnlockUpload("one")) +} diff --git a/vendor/vendor.json b/vendor/vendor.json index 39b546d..83d5be0 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -1,6 +1,6 @@ { "comment": "", - "ignore": "test github.com/hashicorp/consul/ github.com/aws/aws-sdk-go/ github.com/prometheus/client_golang/prometheus/", + "ignore": "test github.com/hashicorp/consul/ github.com/aws/aws-sdk-go/ github.com/prometheus/client_golang/prometheus/ github.com/coreos/etcd github.com/mwitkow/go-etcd-harness", "package": [ { "path": "appengine",