Add locking using Consul

This commit is contained in:
Marius 2016-02-11 16:04:50 +01:00
parent c7ce14fbdc
commit 290d03aff9
2 changed files with 172 additions and 0 deletions

View File

@ -0,0 +1,117 @@
// Package consullocker provides a locking mechanism using a Consul server.
//
// Consul's (https://www.consul.io) key/value storage system can also be used
// for building a distributed exclusive locking mechanism, often referred to
// as leader election (https://www.consul.io/docs/guides/leader-election.html).
//
// Due to Consul being an external server, connection issues can occur between
// tusd and Consul. In this situation, tusd cannot always ensure that it still
// holds a lock and may panic in an unrecoverable way. This may seems like an
// inconvenient decision but is probably the best solution since we are not
// able to interrupt other goroutines which may be involved in moving the
// uploaded data to a backend.
package consullocker
import (
"sync"
"time"
consul "github.com/hashicorp/consul/api"
"github.com/tus/tusd"
)
type ConsulLocker struct {
// Client used to connect to the Consul server
Client *consul.Client
// ConnectionName is an optional field which may contain a human-readable
// description for the connection. It is only used for composing error
// messages and can be used to match them to a specific Consul instance.
ConnectionName string
// locks is used for storing consul.Lock structs before they are unlocked.
// If you want to release a lock, you need the same consul.Lock instance
// and therefore we need to save them temporarily.
locks map[string]*consul.Lock
mutex *sync.RWMutex
}
// New constructs a new locker using the provided client.
func New(client *consul.Client) *ConsulLocker {
return &ConsulLocker{
Client: client,
locks: make(map[string]*consul.Lock),
mutex: new(sync.RWMutex),
}
}
// LockUpload tries to obtain the exclusive lock.
func (locker *ConsulLocker) LockUpload(id string) error {
lock, err := locker.Client.LockOpts(&consul.LockOptions{
Key: id + "/" + consul.DefaultSemaphoreKey,
LockTryOnce: true,
LockWaitTime: time.Second,
})
if err != nil {
return err
}
ch, err := lock.Lock(nil)
if ch == nil {
if err == nil || err == consul.ErrLockHeld {
return tusd.ErrFileLocked
} else {
return err
}
}
locker.mutex.Lock()
defer locker.mutex.Unlock()
// Only add the lock to our list if the acquire was successful and no error appeared.
locker.locks[id] = lock
go func() {
// This channel will be closed once we lost the lock. This can either happen
// wanted (using the Unlock method) or by accident, e.g. if the connection
// to the Consul server is lost.
<-ch
locker.mutex.RLock()
defer locker.mutex.RUnlock()
// Only proceed if the lock has been lost by accident. If we cannot find it
// in the map, it has already been gracefully removed (see UnlockUpload).
if _, ok := locker.locks[id]; !ok {
return
}
msg := "consullocker: lock for upload '" + id + "' has been lost."
if locker.ConnectionName != "" {
msg += " Please ensure that the connection to '" + locker.ConnectionName + "' is stable."
} else {
msg += " Please ensure that the connection to Consul is stable (use ConnectionName to provide a printable name)."
}
// This will cause the program to crash since a panic can only be recovered
// from the causing goroutine.
panic(msg)
}()
return nil
}
// UnlockUpload releases a lock. If no such lock exists, no error will be returned.
func (locker *ConsulLocker) UnlockUpload(id string) error {
locker.mutex.Lock()
defer locker.mutex.Unlock()
// Complain if no lock has been found. This can only happen if LockUpload
// has not been invoked before or UnlockUpload multiple times.
lock, ok := locker.locks[id]
if !ok {
return consul.ErrLockNotHeld
}
defer delete(locker.locks, id)
return lock.Unlock()
}

View File

@ -0,0 +1,55 @@
package consullocker
import (
"testing"
"time"
consul "github.com/hashicorp/consul/api"
consultestutil "github.com/hashicorp/consul/testutil"
"github.com/stretchr/testify/assert"
"github.com/tus/tusd"
)
func TestConsulLocker(t *testing.T) {
a := assert.New(t)
server := consultestutil.NewTestServer(t)
defer server.Stop()
client, err := consul.NewClient(&consul.Config{
Address: server.HTTPAddr,
})
a.NoError(err)
locker := New(client)
a.NoError(locker.LockUpload("one"))
a.Equal(tusd.ErrFileLocked, locker.LockUpload("one"))
a.NoError(locker.UnlockUpload("one"))
a.Equal(consul.ErrLockNotHeld, locker.UnlockUpload("one"))
}
func TestLockLost(t *testing.T) {
// This test will panic because the connection to Consul will be cut, which
// is indented.
// TODO: find a way to test this
t.SkipNow()
a := assert.New(t)
server := consultestutil.NewTestServer(t)
client, err := consul.NewClient(&consul.Config{
Address: server.HTTPAddr,
})
a.NoError(err)
locker := New(client)
locker.ConnectionName = server.HTTPAddr
a.NoError(locker.LockUpload("two"))
server.Stop()
time.Sleep(time.Hour)
}