Integrate LockingStore into UnroutedHandler

The reason behind this drastic move was that sometimes a lock needs to
persist across multiple calls to a DataStore. For example, when a PATCH
request is received, following behaviour is wanted:
	* Obtain lock (LockUpload)
	* Get offset, size etc (GetInfo)
	* Write data (WriteChunk)
	* Release lock (UnlockUpload)
However, before this change, the lock would be release and then obtained again
after the GetInfo and before the WriteChunk call. This resulted in an
inefficient resource usage and even a possible race condition.

The effects of this change was:
* FileLocker is now directly integrated into FileStore and not sperarated
* LockingStore and the entire package has been removed
* MemoryLocker has been moved into its very own package
This commit is contained in:
Marius 2015-12-26 21:23:09 +01:00
parent cfabbf5ffb
commit f6a5530df8
12 changed files with 296 additions and 328 deletions

View File

@ -55,3 +55,24 @@ type DataStore interface {
// and writing, must return os.ErrNotExist or similar.
Terminate(id string) error
}
// LockerDataStore is the interface required for custom lock persisting mechanisms.
// Common ways to store this information is in memory, on disk or using an
// external service, such as ZooKeeper.
// When multiple processes are attempting to access an upload, whether it be
// by reading or writing, a syncronization mechanism is required to prevent
// data corruption, especially to ensure correct offset values and the proper
// order of chunks inside a single upload.
type LockerDataStore interface {
DataStore
// LockUpload attempts to obtain an exclusive lock for the upload specified
// by its id.
// If this operation fails because the resource is already locked, the
// tusd.ErrFileLocked must be returned. If no error is returned, the attempt
// is consider to be successful and the upload to be locked until UnlockUpload
// is invoked for the same upload.
LockUpload(id string) error
// UnlockUpload releases an existing lock for the given upload.
UnlockUpload(id string) error
}

View File

@ -1,65 +0,0 @@
package filestore
import (
"os"
"path/filepath"
"github.com/nightlyone/lockfile"
"github.com/tus/tusd"
)
// FileLocker provides an exclusive upload locking mechansim using lock files
// which are stored on disk. Each of them stores the PID of the process which
// aquired the lock. This allows locks to be automatically freed when a process
// is unable to release it on its own because the process is not alive anymore.
//
// Please consult the package lockingstore for instructions on how to use this
// locker.
type FileLocker struct {
// Relative or absolute path to store the locks in.
Path string
}
func (locker FileLocker) LockUpload(id string) error {
lock, err := locker.newLock(id)
if err != nil {
return err
}
err = lock.TryLock()
if err == lockfile.ErrBusy {
return tusd.ErrFileLocked
}
return err
}
func (locker FileLocker) UnlockUpload(id string) error {
lock, err := locker.newLock(id)
if err != nil {
return err
}
err = lock.Unlock()
// A "no such file or directory" will be returned if no lockfile was found.
// Since this means that the file has never been locked, we drop the error
// and continue as if nothing happend.
if os.IsNotExist(err) {
err = nil
}
return nil
}
func (locker FileLocker) newLock(id string) (lockfile.Lockfile, error) {
path, err := filepath.Abs(locker.Path + "/" + id + ".lock")
if err != nil {
return lockfile.Lockfile(""), err
}
// We use Lockfile directly instead of lockfile.New to bypass the unnecessary
// check whether the provided path is absolute since we just resolved it
// on our own.
return lockfile.Lockfile(path), nil
}

View File

@ -1,35 +0,0 @@
package filestore
import (
"io/ioutil"
"testing"
"github.com/tus/tusd"
"github.com/tus/tusd/lockingstore"
)
func TestFileLocker(t *testing.T) {
dir, err := ioutil.TempDir("", "tusd-file-locker")
if err != nil {
t.Fatal(err)
}
var locker lockingstore.Locker
locker = FileLocker{dir}
if err := locker.LockUpload("one"); err != nil {
t.Errorf("unexpected error when locking file: %s", err)
}
if err := locker.LockUpload("one"); err != tusd.ErrFileLocked {
t.Errorf("expected error when locking locked file: %s", err)
}
if err := locker.UnlockUpload("one"); err != nil {
t.Errorf("unexpected error when unlocking file: %s", err)
}
if err := locker.UnlockUpload("one"); err != nil {
t.Errorf("unexpected error when unlocking file again: %s", err)
}
}

View File

@ -6,6 +6,13 @@
// `[id].bin` files contain the raw binary data uploaded.
// No cleanup is performed so you may want to run a cronjob to ensure your disk
// is not filled up with old and finished uploads.
//
// In addition, it provides an exclusive upload locking mechansim using lock files
// which are stored on disk. Each of them stores the PID of the process which
// aquired the lock. This allows locks to be automatically freed when a process
// is unable to release it on its own because the process is not alive anymore.
// For more information, consult the documentation for tusd.LockerDataStore
// interface, which is implemented by FileStore
package filestore
import (
@ -13,10 +20,12 @@ import (
"io"
"io/ioutil"
"os"
"path/filepath"
"github.com/tus/tusd"
"github.com/tus/tusd/lockingstore"
"github.com/tus/tusd/uid"
"github.com/nightlyone/lockfile"
)
var defaultFilePerm = os.FileMode(0775)
@ -32,16 +41,9 @@ type FileStore struct {
// New creates a new file based storage backend. The directory specified will
// be used as the only storage entry. This method does not check
// whether the path exists, use os.MkdirAll to ensure.
// In addition, a locking mechanism is provided using lockingstore.LockingStore
// and FileLocker.
func New(path string) tusd.DataStore {
store := FileStore{path}
locker := FileLocker{path}
return lockingstore.LockingStore{
DataStore: &store,
Locker: &locker,
}
// In addition, a locking mechanism is provided.
func New(path string) FileStore {
return FileStore{path}
}
func (store FileStore) NewUpload(info tusd.FileInfo) (id string, err error) {
@ -100,17 +102,62 @@ func (store FileStore) Terminate(id string) error {
return nil
}
// Return the path to the .bin storing the binary data
func (store FileStore) LockUpload(id string) error {
lock, err := store.newLock(id)
if err != nil {
return err
}
err = lock.TryLock()
if err == lockfile.ErrBusy {
return tusd.ErrFileLocked
}
return err
}
func (store FileStore) UnlockUpload(id string) error {
lock, err := store.newLock(id)
if err != nil {
return err
}
err = lock.Unlock()
// A "no such file or directory" will be returned if no lockfile was found.
// Since this means that the file has never been locked, we drop the error
// and continue as if nothing happend.
if os.IsNotExist(err) {
err = nil
}
return nil
}
// newLock contructs a new Lockfile instance.
func (store FileStore) newLock(id string) (lockfile.Lockfile, error) {
path, err := filepath.Abs(store.Path + "/" + id + ".lock")
if err != nil {
return lockfile.Lockfile(""), err
}
// We use Lockfile directly instead of lockfile.New to bypass the unnecessary
// check whether the provided path is absolute since we just resolved it
// on our own.
return lockfile.Lockfile(path), nil
}
// binPath returns the path to the .bin storing the binary data.
func (store FileStore) binPath(id string) string {
return store.Path + "/" + id + ".bin"
}
// Return the path to the .info file storing the file's info
// infoPath returns the path to the .info file storing the file's info.
func (store FileStore) infoPath(id string) string {
return store.Path + "/" + id + ".info"
}
// Update the entire information. Everything will be overwritten.
// writeInfo updates the entire information. Everything will be overwritten.
func (store FileStore) writeInfo(id string, info tusd.FileInfo) error {
data, err := json.Marshal(info)
if err != nil {
@ -119,7 +166,7 @@ func (store FileStore) writeInfo(id string, info tusd.FileInfo) error {
return ioutil.WriteFile(store.infoPath(id), data, defaultFilePerm)
}
// Update the .info file using the new upload.
// setOffset updates the .info file to match the new offset.
func (store FileStore) setOffset(id string, offset int64) error {
info, err := store.GetInfo(id)
if err != nil {

View File

@ -95,3 +95,29 @@ func TestFilestore(t *testing.T) {
t.Fatal("expected os.ErrIsNotExist")
}
}
func TestFileLocker(t *testing.T) {
dir, err := ioutil.TempDir("", "tusd-file-locker")
if err != nil {
t.Fatal(err)
}
var locker tusd.LockerDataStore
locker = FileStore{dir}
if err := locker.LockUpload("one"); err != nil {
t.Errorf("unexpected error when locking file: %s", err)
}
if err := locker.LockUpload("one"); err != tusd.ErrFileLocked {
t.Errorf("expected error when locking locked file: %s", err)
}
if err := locker.UnlockUpload("one"); err != nil {
t.Errorf("unexpected error when unlocking file: %s", err)
}
if err := locker.UnlockUpload("one"); err != nil {
t.Errorf("unexpected error when unlocking file again: %s", err)
}
}

View File

@ -1,99 +0,0 @@
// Package lockingstore manages concurrent access to a single upload.
//
// When multiple processes are attempting to access an upload, whether it be
// by reading or writing, a syncronization mechanism is required to prevent
// data corruption, especially to ensure correct offset values and the proper
// order of chunks inside a single upload.
//
// This package wrappes an existing data storage and only allows a single access
// at a time by using an exclusive locking mechanism.
package lockingstore
import (
"io"
"github.com/tus/tusd"
)
// Locker is the interface required for custom lock persisting mechanisms.
// Common ways to store this information is in memory, on disk or using an
// external service, such as ZooKeeper.
type Locker interface {
// LockUpload attempts to obtain an exclusive lock for the upload specified
// by its id.
// If this operation fails because the resource is already locked, the
// tusd.ErrFileLocked must be returned. If no error is returned, the attempt
// is consider to be successful and the upload to be locked until UnlockUpload
// is invoked for the same upload.
LockUpload(id string) error
// UnlockUpload releases an existing lock for the given upload.
UnlockUpload(id string) error
}
// LockingStore wraps an existing data storage and catches all operation.
// Before passing the method calls to the underlying backend, locks are required
// to be obtained.
type LockingStore struct {
// The underlying data storage to which the operation will be passed if an
// upload is not locked.
tusd.DataStore
// The custom locking persisting mechanism used for obtaining and releasing
// locks.
Locker Locker
}
func (store LockingStore) WriteChunk(id string, offset int64, src io.Reader) (n int64, err error) {
if err := store.Locker.LockUpload(id); err != nil {
return 0, err
}
defer func() {
if unlockErr := store.Locker.UnlockUpload(id); unlockErr != nil {
err = unlockErr
}
}()
return store.DataStore.WriteChunk(id, offset, src)
}
func (store LockingStore) GetInfo(id string) (info tusd.FileInfo, err error) {
if err := store.Locker.LockUpload(id); err != nil {
return info, err
}
defer func() {
if unlockErr := store.Locker.UnlockUpload(id); unlockErr != nil {
err = unlockErr
}
}()
return store.DataStore.GetInfo(id)
}
func (store LockingStore) GetReader(id string) (src io.Reader, err error) {
if err := store.Locker.LockUpload(id); err != nil {
return nil, err
}
defer func() {
if unlockErr := store.Locker.UnlockUpload(id); unlockErr != nil {
err = unlockErr
}
}()
return store.DataStore.GetReader(id)
}
func (store LockingStore) Terminate(id string) (err error) {
if err := store.Locker.LockUpload(id); err != nil {
return err
}
defer func() {
if unlockErr := store.Locker.UnlockUpload(id); unlockErr != nil {
err = unlockErr
}
}()
return store.DataStore.Terminate(id)
}

View File

@ -1,82 +0,0 @@
package lockingstore_test
import (
"io"
"testing"
"github.com/tus/tusd"
. "github.com/tus/tusd/lockingstore"
)
type store struct {
calls int
}
func (store *store) NewUpload(info tusd.FileInfo) (string, error) {
return "", nil
}
func (store *store) WriteChunk(id string, offset int64, src io.Reader) (int64, error) {
store.calls += 1
return 0, nil
}
func (store *store) GetInfo(id string) (tusd.FileInfo, error) {
store.calls += 1
return tusd.FileInfo{}, nil
}
func (store *store) GetReader(id string) (io.Reader, error) {
store.calls += 1
return nil, nil
}
func (store *store) Terminate(id string) error {
store.calls += 1
return nil
}
type locker struct {
lockCalls int
unlockCalls int
}
func (locker *locker) LockUpload(id string) error {
locker.lockCalls += 1
if id == "no" {
return tusd.ErrFileLocked
}
return nil
}
func (locker *locker) UnlockUpload(id string) error {
locker.unlockCalls += 1
return nil
}
func TestLockingStore(t *testing.T) {
locker := new(locker)
store := new(store)
lstore := LockingStore{
DataStore: store,
Locker: locker,
}
lstore.NewUpload(tusd.FileInfo{})
lstore.WriteChunk("", 0, nil)
lstore.GetInfo("")
lstore.GetReader("")
lstore.Terminate("")
lstore.WriteChunk("no", 0, nil)
lstore.GetInfo("no")
lstore.GetReader("no")
lstore.Terminate("no")
if locker.lockCalls != 8 {
t.Error("expected 8 calls to LockUpload, but got %d", locker.lockCalls)
}
if locker.unlockCalls != 4 {
t.Error("expected 8 calls to UnlockUpload, but got %d", locker.unlockCalls)
}
}

View File

@ -1,28 +0,0 @@
package lockingstore
import (
"testing"
"github.com/tus/tusd"
)
func TestMemoryLocker(t *testing.T) {
var locker Locker
locker = NewMemoryLocker()
if err := locker.LockUpload("one"); err != nil {
t.Errorf("unexpected error when locking file: %s", err)
}
if err := locker.LockUpload("one"); err != tusd.ErrFileLocked {
t.Errorf("expected error when locking locked file: %s", err)
}
if err := locker.UnlockUpload("one"); err != nil {
t.Errorf("unexpected error when unlocking file: %s", err)
}
if err := locker.UnlockUpload("one"); err != nil {
t.Errorf("unexpected error when unlocking file again: %s", err)
}
}

View File

@ -1,4 +1,14 @@
package lockingstore
// Package memorylocker provides an in-memory locking mechanism
//
// When multiple processes are attempting to access an upload, whether it be
// by reading or writing, a syncronization mechanism is required to prevent
// data corruption, especially to ensure correct offset values and the proper
// order of chunks inside a single upload.
//
// MemoryLocker persists locks using memory and therefore allowing a simple and
// cheap mechansim. Locks will only exist as long as this object is kept in
// reference and will be erased if the program exits.
package memorylocker
import (
"github.com/tus/tusd"
@ -8,12 +18,14 @@ import (
// cheap mechansim. Locks will only exist as long as this object is kept in
// reference and will be erased if the program exits.
type MemoryLocker struct {
tusd.DataStore
locks map[string]bool
}
// New creates a new lock memory persistor.
func NewMemoryLocker() *MemoryLocker {
// New creates a new lock memory wrapper around the provided storage.
func NewMemoryLocker(store tusd.DataStore) *MemoryLocker {
return &MemoryLocker{
DataStore: store,
locks: make(map[string]bool),
}
}

View File

@ -0,0 +1,50 @@
package memorylocker
import (
"io"
"testing"
"github.com/tus/tusd"
)
type zeroStore struct{}
func (store zeroStore) NewUpload(info tusd.FileInfo) (string, error) {
return "", nil
}
func (store zeroStore) WriteChunk(id string, offset int64, src io.Reader) (int64, error) {
return 0, nil
}
func (store zeroStore) GetInfo(id string) (tusd.FileInfo, error) {
return tusd.FileInfo{}, nil
}
func (store zeroStore) GetReader(id string) (io.Reader, error) {
return nil, tusd.ErrNotImplemented
}
func (store zeroStore) Terminate(id string) error {
return tusd.ErrNotImplemented
}
func TestMemoryLocker(t *testing.T) {
var locker tusd.LockerDataStore
locker = NewMemoryLocker(&zeroStore{})
if err := locker.LockUpload("one"); err != nil {
t.Errorf("unexpected error when locking file: %s", err)
}
if err := locker.LockUpload("one"); err != tusd.ErrFileLocked {
t.Errorf("expected error when locking locked file: %s", err)
}
if err := locker.UnlockUpload("one"); err != nil {
t.Errorf("unexpected error when unlocking file: %s", err)
}
if err := locker.UnlockUpload("one"); err != nil {
t.Errorf("unexpected error when unlocking file again: %s", err)
}
}

View File

@ -206,3 +206,87 @@ func TestPatchOverflow(t *testing.T) {
Code: http.StatusNoContent,
}).Run(handler, t)
}
const (
LOCK = iota
INFO
WRITE
UNLOCK
END
)
type lockingPatchStore struct {
zeroStore
callOrder chan int
}
func (s lockingPatchStore) GetInfo(id string) (FileInfo, error) {
s.callOrder <- INFO
return FileInfo{
Offset: 0,
Size: 20,
}, nil
}
func (s lockingPatchStore) WriteChunk(id string, offset int64, src io.Reader) (int64, error) {
s.callOrder <- WRITE
return 5, nil
}
func (s lockingPatchStore) LockUpload(id string) error {
s.callOrder <- LOCK
return nil
}
func (s lockingPatchStore) UnlockUpload(id string) error {
s.callOrder <- UNLOCK
return nil
}
func TestLockingPatch(t *testing.T) {
callOrder := make(chan int, 10)
handler, _ := NewHandler(Config{
DataStore: lockingPatchStore{
callOrder: callOrder,
},
})
(&httpTest{
Name: "Uploading to locking store",
Method: "PATCH",
URL: "yes",
ReqHeader: map[string]string{
"Tus-Resumable": "1.0.0",
"Content-Type": "application/offset+octet-stream",
"Upload-Offset": "0",
},
ReqBody: strings.NewReader("hello"),
Code: http.StatusNoContent,
}).Run(handler, t)
callOrder <- END
close(callOrder)
if <-callOrder != LOCK {
t.Error("expected call to LockUpload")
}
if <-callOrder != INFO {
t.Error("expected call to GetInfo")
}
if <-callOrder != WRITE {
t.Error("expected call to WriteChunk")
}
if <-callOrder != UNLOCK {
t.Error("expected call to UnlockUpload")
}
if <-callOrder != END {
t.Error("expected no more calls to happen")
}
}

View File

@ -257,6 +257,16 @@ func (handler *UnroutedHandler) HeadFile(w http.ResponseWriter, r *http.Request)
handler.sendError(w, r, err)
return
}
if locker, ok := handler.dataStore.(LockerDataStore); ok {
if err := locker.LockUpload(id); err != nil {
handler.sendError(w, r, err)
return
}
defer locker.UnlockUpload(id)
}
info, err := handler.dataStore.GetInfo(id)
if err != nil {
handler.sendError(w, r, err)
@ -308,6 +318,15 @@ func (handler *UnroutedHandler) PatchFile(w http.ResponseWriter, r *http.Request
return
}
if locker, ok := handler.dataStore.(LockerDataStore); ok {
if err := locker.LockUpload(id); err != nil {
handler.sendError(w, r, err)
return
}
defer locker.UnlockUpload(id)
}
info, err := handler.dataStore.GetInfo(id)
if err != nil {
handler.sendError(w, r, err)
@ -370,6 +389,15 @@ func (handler *UnroutedHandler) GetFile(w http.ResponseWriter, r *http.Request)
return
}
if locker, ok := handler.dataStore.(LockerDataStore); ok {
if err := locker.LockUpload(id); err != nil {
handler.sendError(w, r, err)
return
}
defer locker.UnlockUpload(id)
}
info, err := handler.dataStore.GetInfo(id)
if err != nil {
handler.sendError(w, r, err)
@ -407,6 +435,15 @@ func (handler *UnroutedHandler) DelFile(w http.ResponseWriter, r *http.Request)
return
}
if locker, ok := handler.dataStore.(LockerDataStore); ok {
if err := locker.LockUpload(id); err != nil {
handler.sendError(w, r, err)
return
}
defer locker.UnlockUpload(id)
}
err = handler.dataStore.Terminate(id)
if err != nil {
handler.sendError(w, r, err)