etc3dlocker: Implement new Locker interface

This commit is contained in:
Marius 2019-09-11 12:03:39 +02:00
parent 65072acb79
commit 92826b171d
3 changed files with 49 additions and 65 deletions

View File

@ -6,14 +6,16 @@ import (
"context" "context"
"time" "time"
"github.com/tus/tusd/pkg/handler"
"github.com/coreos/etcd/clientv3/concurrency" "github.com/coreos/etcd/clientv3/concurrency"
"github.com/tus/tusd/pkg/handler"
) )
type etcd3Lock struct { type etcd3Lock struct {
Id string Id string
Mutex *concurrency.Mutex Mutex *concurrency.Mutex
Session *concurrency.Session Session *concurrency.Session
isHeld bool
} }
func newEtcd3Lock(session *concurrency.Session, id string) *etcd3Lock { func newEtcd3Lock(session *concurrency.Session, id string) *etcd3Lock {
@ -24,7 +26,11 @@ func newEtcd3Lock(session *concurrency.Session, id string) *etcd3Lock {
} }
// Acquires a lock from etcd3 // Acquires a lock from etcd3
func (lock *etcd3Lock) Acquire() error { func (lock *etcd3Lock) Lock() error {
if lock.isHeld {
return handler.ErrFileLocked
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() defer cancel()
@ -37,15 +43,18 @@ func (lock *etcd3Lock) Acquire() error {
return err return err
} }
} }
lock.isHeld = true
return nil return nil
} }
// Releases a lock from etcd3 // Releases a lock from etcd3
func (lock *etcd3Lock) Release() error { func (lock *etcd3Lock) Unlock() error {
if !lock.isHeld {
return ErrLockNotHeld
}
lock.isHeld = false
defer lock.Session.Close()
return lock.Mutex.Unlock(context.Background()) return lock.Mutex.Unlock(context.Background())
} }
// Closes etcd3 session
func (lock *etcd3Lock) CloseSession() error {
return lock.Session.Close()
}

View File

@ -44,12 +44,11 @@ package etcd3locker
import ( import (
"errors" "errors"
"sync"
"time" "time"
"github.com/tus/tusd/pkg/handler"
etcd3 "github.com/coreos/etcd/clientv3" etcd3 "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency" "github.com/coreos/etcd/clientv3/concurrency"
"github.com/tus/tusd/pkg/handler"
) )
var ( var (
@ -61,11 +60,6 @@ type Etcd3Locker struct {
// etcd3 client session // etcd3 client session
Client *etcd3.Client Client *etcd3.Client
// locks is used for storing Etcd3Locks before they are
// unlocked. If you want to release a lock, you need the same locker
// instance and therefore we need to save them temporarily.
locks map[string]*etcd3Lock
mutex sync.Mutex
prefix string prefix string
sessionTtl int sessionTtl int
} }
@ -84,56 +78,24 @@ func NewWithPrefix(client *etcd3.Client, prefix string) (*Etcd3Locker, error) {
// This method may be used if we want control over both prefix/session TTLs. This is used for testing in particular. // This method may be used if we want control over both prefix/session TTLs. This is used for testing in particular.
func NewWithLockerOptions(client *etcd3.Client, opts LockerOptions) (*Etcd3Locker, error) { func NewWithLockerOptions(client *etcd3.Client, opts LockerOptions) (*Etcd3Locker, error) {
locksMap := map[string]*etcd3Lock{} return &Etcd3Locker{Client: client, prefix: opts.Prefix(), sessionTtl: opts.Ttl()}, nil
return &Etcd3Locker{Client: client, prefix: opts.Prefix(), sessionTtl: opts.Ttl(), locks: locksMap, mutex: sync.Mutex{}}, nil
} }
// UseIn adds this locker to the passed composer. // UseIn adds this locker to the passed composer.
func (locker *Etcd3Locker) UseIn(composer *handler.StoreComposer) { func (locker *Etcd3Locker) UseIn(composer *handler.StoreComposer) {
composer.UseLocker(locker) // TODO: Add back UseIn method
//composer.UseLocker(locker)
} }
// LockUpload tries to obtain the exclusive lock. func (locker *Etcd3Locker) NewLock(id string) (handler.Lock, error) {
func (locker *Etcd3Locker) LockUpload(id string) error {
session, err := locker.createSession() session, err := locker.createSession()
if err != nil { if err != nil {
return err return nil, err
} }
lock := newEtcd3Lock(session, locker.getId(id)) lock := newEtcd3Lock(session, locker.getId(id))
err = lock.Acquire() return lock, nil
if err != nil {
return err
}
locker.mutex.Lock()
defer locker.mutex.Unlock()
// Only add the lock to our list if the acquire was successful and no error appeared.
locker.locks[locker.getId(id)] = lock
return nil
}
// UnlockUpload releases a lock.
func (locker *Etcd3Locker) UnlockUpload(id string) error {
locker.mutex.Lock()
defer locker.mutex.Unlock()
// Complain if no lock has been found. This can only happen if LockUpload
// has not been invoked before or UnlockUpload multiple times.
lock, ok := locker.locks[locker.getId(id)]
if !ok {
return ErrLockNotHeld
}
err := lock.Release()
if err != nil {
return err
}
defer delete(locker.locks, locker.getId(id))
return lock.CloseSession()
} }
func (locker *Etcd3Locker) createSession() (*concurrency.Session, error) { func (locker *Etcd3Locker) createSession() (*concurrency.Session, error) {

View File

@ -1,16 +1,19 @@
package etcd3locker package etcd3locker
import ( import (
etcd_harness "github.com/chen-anders/go-etcd-harness"
"github.com/coreos/etcd/clientv3"
"os" "os"
"testing" "testing"
"time" "time"
etcd_harness "github.com/chen-anders/go-etcd-harness"
"github.com/coreos/etcd/clientv3"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tus/tusd/pkg/handler" "github.com/tus/tusd/pkg/handler"
) )
var _ handler.Locker = &Etcd3Locker{}
func TestEtcd3Locker(t *testing.T) { func TestEtcd3Locker(t *testing.T) {
a := assert.New(t) a := assert.New(t)
@ -39,21 +42,31 @@ func TestEtcd3Locker(t *testing.T) {
lockerOptions := NewLockerOptions(shortTTL, testPrefix) lockerOptions := NewLockerOptions(shortTTL, testPrefix)
locker, err := NewWithLockerOptions(client, lockerOptions) locker, err := NewWithLockerOptions(client, lockerOptions)
a.NoError(err) a.NoError(err)
a.NoError(locker.LockUpload("one"))
a.Equal(handler.ErrFileLocked, locker.LockUpload("one")) lock1, err := locker.NewLock("one")
a.NoError(err)
a.NoError(lock1.Lock())
//a.Equal(handler.ErrFileLocked, lock1.Lock())
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
// test that we can't take over the upload via a different etcd3 session // test that we can't take over the upload via a different etcd3 session
// while an upload is already taking place; testing etcd3 session KeepAlive // while an upload is already taking place; testing etcd3 session KeepAlive
a.Equal(handler.ErrFileLocked, locker.LockUpload("one")) lock2, err := locker.NewLock("one")
a.NoError(locker.UnlockUpload("one")) a.NoError(err)
a.Equal(ErrLockNotHeld, locker.UnlockUpload("one")) a.Equal(handler.ErrFileLocked, lock2.Lock())
a.NoError(lock1.Unlock())
a.Equal(ErrLockNotHeld, lock1.Unlock())
testPrefix = "/test-tusd2" testPrefix = "/test-tusd2"
locker2, err := NewWithPrefix(client, testPrefix) locker2, err := NewWithPrefix(client, testPrefix)
a.NoError(err) a.NoError(err)
a.NoError(locker2.LockUpload("one"))
a.Equal(handler.ErrFileLocked, locker2.LockUpload("one")) lock3, err := locker2.NewLock("one")
a.Equal(handler.ErrFileLocked, locker2.LockUpload("one")) a.NoError(err)
a.NoError(locker2.UnlockUpload("one"))
a.Equal(ErrLockNotHeld, locker2.UnlockUpload("one")) a.NoError(lock3.Lock())
a.Equal(handler.ErrFileLocked, lock3.Lock())
a.Equal(handler.ErrFileLocked, lock3.Lock())
a.NoError(lock3.Unlock())
a.Equal(ErrLockNotHeld, lock3.Unlock())
} }