Add etcd3 locker (#202)

* First pass at etcd3 locker

* Add etcd packages to vendor ignore attribute

* Properly exclude etcd3locker from earlier versions of golang

* Etcd test to only run on go1.9 and go1.10

* Do not export internal etcd3locker methods

* context.TODO -> context.Background

* Return errors instead of logging

* Enforce a 1.5s timeout on acquiring a lease from etcd

* Allow etcd3 concurreny.NewSession manage KeepAlive connection

- Introduce LockerOptions to allow a custom session TTL (default: 60s); etcd3Locker can be initialized with NewWithLockerOptions if one wants granular control over TTL and prefix used for etcd3 keys
- Keep NewWithPrefix backwards compatible by calling NewWithLockerOptions

* Add comment on locker_test.go regarding testing the session KeepAlive

* Re-export main type

* Try to address missing github.com/gogo/protobuf/gogoproto package

* Update etcd package import to use go.etcd.io/etcd/clientv3

* Use forked go-etcd-harness for testing

* Add more extensive package overview / docs

* go fmt

* Fix test script

* Add downloaded etcd binary to path
This commit is contained in:
Anders Chen 2018-11-10 21:05:44 +01:00 committed by Marius
parent 1b756f0239
commit 9af87d50ce
6 changed files with 333 additions and 2 deletions

View File

@ -5,7 +5,7 @@ set -e
# Find all packages containing Go source code inside the current directory
packages=$(find ./ -maxdepth 2 -name '*.go' -printf '%h\n' | sort | uniq)
# The consul package only supports Go1.8+ and therefore we will only run the
# The consul package only supports Go1.10+ and therefore we will only run the
# corresponding tests on these versions.
goversion=$(go version)
if [[ "$goversion" == *"go1.5"* ]] ||
@ -26,6 +26,28 @@ else
go get -u github.com/hashicorp/consul/...
fi
install_etcd_pkgs() {
ETCD_VERSION="3.3.10"
go get -u go.etcd.io/etcd/clientv3
go get -u github.com/chen-anders/go-etcd-harness
wget -q -O /tmp/etcd.tar.gz "https://github.com/etcd-io/etcd/releases/download/v$ETCD_VERSION/etcd-v$ETCD_VERSION-linux-amd64.tar.gz"
tar xvzf /tmp/etcd.tar.gz -C /tmp
export PATH="$PATH:/tmp/etcd-v$ETCD_VERSION-linux-amd64"
}
# The etcd 3.3.x package only supports Go1.9+ and therefore
# we will only run the corresponding tests on these versions.
if [[ "$goversion" == *"go1.5"* ]] ||
[[ "$goversion" == *"go1.6"* ]] ||
[[ "$goversion" == *"go1.7"* ]] ||
[[ "$goversion" == *"go1.8"* ]]; then
echo "Skipping tests requiring etcd3locker, which is not supported on $goversion"
packages=$(echo "$packages" | sed '/etcd3locker/d')
else
# Install the etcd packages which are not vendored.
install_etcd_pkgs
fi
# Install the AWS SDK and Prometheus client which is explicitly not vendored
go get -u github.com/aws/aws-sdk-go/...
go get -u github.com/prometheus/client_golang/prometheus

47
etcd3locker/lock.go Normal file
View File

@ -0,0 +1,47 @@
// Tested on etcd 3.1+
package etcd3locker
import (
"context"
"time"
"github.com/tus/tusd"
"go.etcd.io/etcd/clientv3/concurrency"
)
type etcd3Lock struct {
Id string
Mutex *concurrency.Mutex
Session *concurrency.Session
}
func newEtcd3Lock(session *concurrency.Session, id string) *etcd3Lock {
return &etcd3Lock{
Mutex: concurrency.NewMutex(session, id),
Session: session,
}
}
func (lock *etcd3Lock) Acquire() error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// this is a blocking call; if we receive DeadlineExceeded
// the lock is most likely already taken
if err := lock.Mutex.Lock(ctx); err != nil {
if err == context.DeadlineExceeded {
return tusd.ErrFileLocked
} else {
return err
}
}
return nil
}
func (lock *etcd3Lock) Release() error {
return lock.Mutex.Unlock(context.Background())
}
func (lock *etcd3Lock) CloseSession() error {
return lock.Session.Close()
}

145
etcd3locker/locker.go Normal file
View File

@ -0,0 +1,145 @@
// Package etcd3locker provides a locking mechanism using an etcd3 cluster
//
// To initialize a locker, a pre-existing connected etcd3 client must be present
//
// client, err := clientv3.New(clientv3.Config{
// Endpoints: []string{harness.Endpoint},
// DialTimeout: 5 * time.Second,
// })
//
// For the most basic locker (e.g. non-shared etcd3 cluster / use default TTLs),
// a locker can be instantiated like the following:
//
// locker, err := etcd3locker.New(client)
// if err != nil {
// return nil, fmt.Errorf("Failed to create etcd locker: %v", err.Error())
// }
//
// The locker will need to be included in composer that is used by tusd:
//
// composer := tusd.NewStoreComposer()
// locker.UseIn(composer)
//
// For a shared etcd3 cluster, you may want to modify the prefix that etcd3locker uses:
//
// locker, err := etcd3locker.NewWithPrefix(client, "my-prefix")
// if err != nil {
// return nil, fmt.Errorf("Failed to create etcd locker: %v", err.Error())
// }
//
//
// For full control over all options, an etcd3.LockerOptions may be passed into
// etcd3.NewWithLockerOptions like the following example:
//
// ttl := 15 // seconds
// options := etcd3locker.NewLockerOptions(ttl, "my-prefix")
// locker, err := etcd3locker.NewWithLockerOptions(client, options)
// if err != nil {
// return nil, fmt.Errorf("Failed to create etcd locker: %v", err.Error())
// }
//
// Tested on etcd 3.1/3.2/3.3
//
package etcd3locker
import (
"errors"
"sync"
"time"
"github.com/tus/tusd"
etcd3 "go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
)
var (
ErrLockNotHeld = errors.New("Lock not held")
GrantTimeout = 1500 * time.Millisecond
)
type Etcd3Locker struct {
// etcd3 client session
Client *etcd3.Client
// locks is used for storing Etcd3Locks before they are
// unlocked. If you want to release a lock, you need the same locker
// instance and therefore we need to save them temporarily.
locks map[string]*etcd3Lock
mutex sync.Mutex
prefix string
sessionTimeout int
}
// New constructs a new locker using the provided client.
func New(client *etcd3.Client) (*Etcd3Locker, error) {
return NewWithLockerOptions(client, DefaultLockerOptions())
}
// This method may be used if a different prefix is required for multi-tenant etcd clusters
func NewWithPrefix(client *etcd3.Client, prefix string) (*Etcd3Locker, error) {
lockerOptions := DefaultLockerOptions()
lockerOptions.SetPrefix(prefix)
return NewWithLockerOptions(client, lockerOptions)
}
// This method may be used if we want control over both prefix/session TTLs. This is used for testing in particular.
func NewWithLockerOptions(client *etcd3.Client, opts LockerOptions) (*Etcd3Locker, error) {
locksMap := map[string]*etcd3Lock{}
return &Etcd3Locker{Client: client, prefix: opts.Prefix(), sessionTimeout: opts.Timeout(), locks: locksMap, mutex: sync.Mutex{}}, nil
}
// UseIn adds this locker to the passed composer.
func (locker *Etcd3Locker) UseIn(composer *tusd.StoreComposer) {
composer.UseLocker(locker)
}
// LockUpload tries to obtain the exclusive lock.
func (locker *Etcd3Locker) LockUpload(id string) error {
session, err := locker.createSession()
if err != nil {
return err
}
lock := newEtcd3Lock(session, locker.getId(id))
err = lock.Acquire()
if err != nil {
return err
}
locker.mutex.Lock()
defer locker.mutex.Unlock()
// Only add the lock to our list if the acquire was successful and no error appeared.
locker.locks[locker.getId(id)] = lock
return nil
}
// UnlockUpload releases a lock. If no such lock exists, no error will be returned.
func (locker *Etcd3Locker) UnlockUpload(id string) error {
locker.mutex.Lock()
defer locker.mutex.Unlock()
// Complain if no lock has been found. This can only happen if LockUpload
// has not been invoked before or UnlockUpload multiple times.
lock, ok := locker.locks[locker.getId(id)]
if !ok {
return ErrLockNotHeld
}
err := lock.Release()
if err != nil {
return err
}
defer delete(locker.locks, locker.getId(id))
return lock.CloseSession()
}
func (locker *Etcd3Locker) createSession() (*concurrency.Session, error) {
return concurrency.NewSession(locker.Client, concurrency.WithTTL(locker.sessionTimeout))
}
func (locker *Etcd3Locker) getId(id string) string {
return locker.prefix + id
}

View File

@ -0,0 +1,58 @@
package etcd3locker
import (
"strings"
)
var (
DefaultTtl = 60
DefaultPrefix = "/tusd"
)
type LockerOptions struct {
timeoutSeconds int
prefix string
}
func DefaultLockerOptions() LockerOptions {
return LockerOptions{
timeoutSeconds: 60,
prefix: "/tusd",
}
}
func NewLockerOptions(timeout int, prefix string) LockerOptions {
return LockerOptions{
timeoutSeconds: timeout,
prefix: prefix,
}
}
func (l *LockerOptions) Timeout() int {
if l.timeoutSeconds == 0 {
return DefaultTtl
} else {
return l.timeoutSeconds
}
}
func (l *LockerOptions) Prefix() string {
prefix := l.prefix
if !strings.HasPrefix(prefix, "/") {
prefix = "/" + prefix
}
if prefix == "" {
return DefaultPrefix
} else {
return prefix
}
}
func (l *LockerOptions) SetTimeout(timeout int) {
l.timeoutSeconds = timeout
}
func (l *LockerOptions) SetPrefix(prefix string) {
l.prefix = prefix
}

View File

@ -0,0 +1,59 @@
package etcd3locker
import (
etcd_harness "github.com/chen-anders/go-etcd-harness"
"go.etcd.io/etcd/clientv3"
"os"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/tus/tusd"
)
func TestEtcd3Locker(t *testing.T) {
a := assert.New(t)
harness, err := etcd_harness.New(os.Stderr)
if err != nil {
t.Fatalf("failed starting etcd harness: %v", err)
}
t.Logf("will use etcd harness endpoint: %v", harness.Endpoint)
defer func() {
harness.Stop()
t.Logf("cleaned up etcd harness")
}()
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{harness.Endpoint},
DialTimeout: 5 * time.Second,
})
if err != nil {
t.Fatalf("Unable to connect to etcd3: %v", err)
}
defer client.Close()
shortTTL := 3
testPrefix := "/test-tusd"
lockerOptions := NewLockerOptions(shortTTL, testPrefix)
locker, err := NewWithLockerOptions(client, lockerOptions)
a.NoError(err)
a.NoError(locker.LockUpload("one"))
a.Equal(tusd.ErrFileLocked, locker.LockUpload("one"))
time.Sleep(5 * time.Second)
// test that we can't take over the upload via a different etcd3 session
// while an upload is already taking place; testing etcd3 session KeepAlive
a.Equal(tusd.ErrFileLocked, locker.LockUpload("one"))
a.NoError(locker.UnlockUpload("one"))
a.Equal(ErrLockNotHeld, locker.UnlockUpload("one"))
testPrefix = "/test-tusd2"
locker2, err := NewWithPrefix(client, testPrefix)
a.NoError(err)
a.NoError(locker2.LockUpload("one"))
a.Equal(tusd.ErrFileLocked, locker2.LockUpload("one"))
a.Equal(tusd.ErrFileLocked, locker2.LockUpload("one"))
a.NoError(locker2.UnlockUpload("one"))
a.Equal(ErrLockNotHeld, locker2.UnlockUpload("one"))
}

2
vendor/vendor.json vendored
View File

@ -1,6 +1,6 @@
{
"comment": "",
"ignore": "test github.com/hashicorp/consul/ github.com/aws/aws-sdk-go/ github.com/prometheus/client_golang/prometheus/",
"ignore": "test github.com/hashicorp/consul/ github.com/aws/aws-sdk-go/ github.com/prometheus/client_golang/prometheus/ github.com/coreos/etcd github.com/mwitkow/go-etcd-harness",
"package": [
{
"path": "appengine",