consullocker: Remove unused package
This commit is contained in:
parent
8ad39ff466
commit
dc092ddd46
|
@ -5,7 +5,7 @@ set -e
|
||||||
# Find all packages containing Go source code inside the current directory
|
# Find all packages containing Go source code inside the current directory
|
||||||
packages=$(find ./ -maxdepth 2 -name '*.go' -printf '%h\n' | sort | uniq)
|
packages=$(find ./ -maxdepth 2 -name '*.go' -printf '%h\n' | sort | uniq)
|
||||||
|
|
||||||
# The consul package only supports Go1.10+ and therefore we will only run the
|
# Some packages only support Go1.10+ and therefore we will only run the
|
||||||
# corresponding tests on these versions.
|
# corresponding tests on these versions.
|
||||||
goversion=$(go version)
|
goversion=$(go version)
|
||||||
if [[ "$goversion" == *"go1.5"* ]] ||
|
if [[ "$goversion" == *"go1.5"* ]] ||
|
||||||
|
@ -14,19 +14,12 @@ if [[ "$goversion" == *"go1.5"* ]] ||
|
||||||
[[ "$goversion" == *"go1.8"* ]] ||
|
[[ "$goversion" == *"go1.8"* ]] ||
|
||||||
[[ "$goversion" == *"go1.9"* ]]; then
|
[[ "$goversion" == *"go1.9"* ]]; then
|
||||||
|
|
||||||
echo "Skipping tests requiring Consul which is not supported on $goversion"
|
|
||||||
|
|
||||||
# Exclude consullocker since this may not be run on all Go versions.
|
|
||||||
packages=$(echo "$packages" | sed '/consul/d')
|
|
||||||
|
|
||||||
echo "Skipping tests requiring GCSStore, which is not supported on $goversion"
|
echo "Skipping tests requiring GCSStore, which is not supported on $goversion"
|
||||||
packages=$(echo "$packages" | sed '/gcsstore/d')
|
packages=$(echo "$packages" | sed '/gcsstore/d')
|
||||||
|
|
||||||
echo "Skipping tests requiring Prometheus, which is not supported on $goversion"
|
echo "Skipping tests requiring Prometheus, which is not supported on $goversion"
|
||||||
packages=$(echo "$packages" | sed '/prometheuscollector/d')
|
packages=$(echo "$packages" | sed '/prometheuscollector/d')
|
||||||
else
|
else
|
||||||
# Install the Consul and Prometheus client packages which are not vendored.
|
|
||||||
go get -u github.com/hashicorp/consul/...
|
|
||||||
go get -u github.com/prometheus/client_golang/prometheus
|
go get -u github.com/prometheus/client_golang/prometheus
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
|
|
@ -226,7 +226,6 @@ useful tools:
|
||||||
* [**filestore**](https://godoc.org/github.com/tus/tusd/filestore): A storage backend using the local file system
|
* [**filestore**](https://godoc.org/github.com/tus/tusd/filestore): A storage backend using the local file system
|
||||||
* [**gcsstore**](https://godoc.org/github.com/tus/tusd/gcsstore): A storage backend using Google cloud storage
|
* [**gcsstore**](https://godoc.org/github.com/tus/tusd/gcsstore): A storage backend using Google cloud storage
|
||||||
* [**memorylocker**](https://godoc.org/github.com/tus/tusd/memorylocker): An in-memory locker for handling concurrent uploads
|
* [**memorylocker**](https://godoc.org/github.com/tus/tusd/memorylocker): An in-memory locker for handling concurrent uploads
|
||||||
* [**consullocker**](https://godoc.org/github.com/tus/tusd/consullocker): A locker using the distributed Consul service
|
|
||||||
* [**etcd3locker**](https://godoc.org/github.com/tus/tusd/etcd3locker): A locker using the distributed KV etcd3 store
|
* [**etcd3locker**](https://godoc.org/github.com/tus/tusd/etcd3locker): A locker using the distributed KV etcd3 store
|
||||||
* [**limitedstore**](https://godoc.org/github.com/tus/tusd/limitedstore): A storage wrapper limiting the total used space for uploads
|
* [**limitedstore**](https://godoc.org/github.com/tus/tusd/limitedstore): A storage wrapper limiting the total used space for uploads
|
||||||
|
|
||||||
|
|
|
@ -1,122 +0,0 @@
|
||||||
// Package consullocker provides a locking mechanism using a Consul server.
|
|
||||||
//
|
|
||||||
// Consul's (https://www.consul.io) key/value storage system can also be used
|
|
||||||
// for building a distributed exclusive locking mechanism, often referred to
|
|
||||||
// as leader election (https://www.consul.io/docs/guides/leader-election.html).
|
|
||||||
//
|
|
||||||
// Due to Consul being an external server, connection issues can occur between
|
|
||||||
// tusd and Consul. In this situation, tusd cannot always ensure that it still
|
|
||||||
// holds a lock and may panic in an unrecoverable way. This may seems like an
|
|
||||||
// inconvenient decision but is probably the best solution since we are not
|
|
||||||
// able to interrupt other goroutines which may be involved in moving the
|
|
||||||
// uploaded data to a backend.
|
|
||||||
package consullocker
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
consul "github.com/hashicorp/consul/api"
|
|
||||||
"github.com/tus/tusd"
|
|
||||||
)
|
|
||||||
|
|
||||||
type ConsulLocker struct {
|
|
||||||
// Client used to connect to the Consul server
|
|
||||||
Client *consul.Client
|
|
||||||
|
|
||||||
// ConnectionName is an optional field which may contain a human-readable
|
|
||||||
// description for the connection. It is only used for composing error
|
|
||||||
// messages and can be used to match them to a specific Consul instance.
|
|
||||||
ConnectionName string
|
|
||||||
|
|
||||||
// locks is used for storing consul.Lock structs before they are unlocked.
|
|
||||||
// If you want to release a lock, you need the same consul.Lock instance
|
|
||||||
// and therefore we need to save them temporarily.
|
|
||||||
locks map[string]*consul.Lock
|
|
||||||
mutex *sync.RWMutex
|
|
||||||
}
|
|
||||||
|
|
||||||
// New constructs a new locker using the provided client.
|
|
||||||
func New(client *consul.Client) *ConsulLocker {
|
|
||||||
return &ConsulLocker{
|
|
||||||
Client: client,
|
|
||||||
locks: make(map[string]*consul.Lock),
|
|
||||||
mutex: new(sync.RWMutex),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// UseIn adds this locker to the passed composer.
|
|
||||||
func (locker *ConsulLocker) UseIn(composer *tusd.StoreComposer) {
|
|
||||||
composer.UseLocker(locker)
|
|
||||||
}
|
|
||||||
|
|
||||||
// LockUpload tries to obtain the exclusive lock.
|
|
||||||
func (locker *ConsulLocker) LockUpload(id string) error {
|
|
||||||
lock, err := locker.Client.LockOpts(&consul.LockOptions{
|
|
||||||
Key: id + "/" + consul.DefaultSemaphoreKey,
|
|
||||||
LockTryOnce: true,
|
|
||||||
LockWaitTime: time.Second,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
ch, err := lock.Lock(nil)
|
|
||||||
if ch == nil {
|
|
||||||
if err == nil || err == consul.ErrLockHeld {
|
|
||||||
return tusd.ErrFileLocked
|
|
||||||
} else {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
locker.mutex.Lock()
|
|
||||||
defer locker.mutex.Unlock()
|
|
||||||
// Only add the lock to our list if the acquire was successful and no error appeared.
|
|
||||||
locker.locks[id] = lock
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
// This channel will be closed once we lost the lock. This can either happen
|
|
||||||
// wanted (using the Unlock method) or by accident, e.g. if the connection
|
|
||||||
// to the Consul server is lost.
|
|
||||||
<-ch
|
|
||||||
|
|
||||||
locker.mutex.RLock()
|
|
||||||
defer locker.mutex.RUnlock()
|
|
||||||
// Only proceed if the lock has been lost by accident. If we cannot find it
|
|
||||||
// in the map, it has already been gracefully removed (see UnlockUpload).
|
|
||||||
if _, ok := locker.locks[id]; !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
msg := "consullocker: lock for upload '" + id + "' has been lost."
|
|
||||||
if locker.ConnectionName != "" {
|
|
||||||
msg += " Please ensure that the connection to '" + locker.ConnectionName + "' is stable."
|
|
||||||
} else {
|
|
||||||
msg += " Please ensure that the connection to Consul is stable (use ConnectionName to provide a printable name)."
|
|
||||||
}
|
|
||||||
|
|
||||||
// This will cause the program to crash since a panic can only be recovered
|
|
||||||
// from the causing goroutine.
|
|
||||||
panic(msg)
|
|
||||||
}()
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// UnlockUpload releases a lock.
|
|
||||||
func (locker *ConsulLocker) UnlockUpload(id string) error {
|
|
||||||
locker.mutex.Lock()
|
|
||||||
defer locker.mutex.Unlock()
|
|
||||||
|
|
||||||
// Complain if no lock has been found. This can only happen if LockUpload
|
|
||||||
// has not been invoked before or UnlockUpload multiple times.
|
|
||||||
lock, ok := locker.locks[id]
|
|
||||||
if !ok {
|
|
||||||
return consul.ErrLockNotHeld
|
|
||||||
}
|
|
||||||
|
|
||||||
defer delete(locker.locks, id)
|
|
||||||
|
|
||||||
return lock.Unlock()
|
|
||||||
}
|
|
|
@ -1,61 +0,0 @@
|
||||||
package consullocker
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
consul "github.com/hashicorp/consul/api"
|
|
||||||
consultestutil "github.com/hashicorp/consul/sdk/testutil"
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
|
|
||||||
"github.com/tus/tusd"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestConsulLocker(t *testing.T) {
|
|
||||||
a := assert.New(t)
|
|
||||||
|
|
||||||
server, err := consultestutil.NewTestServer()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
defer server.Stop()
|
|
||||||
|
|
||||||
conf := consul.DefaultConfig()
|
|
||||||
conf.Address = server.HTTPAddr
|
|
||||||
client, err := consul.NewClient(conf)
|
|
||||||
a.NoError(err)
|
|
||||||
|
|
||||||
locker := New(client)
|
|
||||||
|
|
||||||
a.NoError(locker.LockUpload("one"))
|
|
||||||
a.Equal(tusd.ErrFileLocked, locker.LockUpload("one"))
|
|
||||||
a.NoError(locker.UnlockUpload("one"))
|
|
||||||
a.Equal(consul.ErrLockNotHeld, locker.UnlockUpload("one"))
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestLockLost(t *testing.T) {
|
|
||||||
// This test will panic because the connection to Consul will be cut, which
|
|
||||||
// is indented.
|
|
||||||
// TODO: find a way to test this
|
|
||||||
t.SkipNow()
|
|
||||||
|
|
||||||
a := assert.New(t)
|
|
||||||
|
|
||||||
server, err := consultestutil.NewTestServer()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
client, err := consul.NewClient(&consul.Config{
|
|
||||||
Address: server.HTTPAddr,
|
|
||||||
})
|
|
||||||
a.NoError(err)
|
|
||||||
|
|
||||||
locker := New(client)
|
|
||||||
locker.ConnectionName = server.HTTPAddr
|
|
||||||
|
|
||||||
a.NoError(locker.LockUpload("two"))
|
|
||||||
|
|
||||||
server.Stop()
|
|
||||||
time.Sleep(time.Hour)
|
|
||||||
}
|
|
Loading…
Reference in New Issue