From f6a5530df833fd62f54dc60943f141624b068d90 Mon Sep 17 00:00:00 2001 From: Marius Date: Sat, 26 Dec 2015 21:23:09 +0100 Subject: [PATCH] Integrate LockingStore into UnroutedHandler The reason behind this drastic move was that sometimes a lock needs to persist across multiple calls to a DataStore. For example, when a PATCH request is received, following behaviour is wanted: * Obtain lock (LockUpload) * Get offset, size etc (GetInfo) * Write data (WriteChunk) * Release lock (UnlockUpload) However, before this change, the lock would be release and then obtained again after the GetInfo and before the WriteChunk call. This resulted in an inefficient resource usage and even a possible race condition. The effects of this change was: * FileLocker is now directly integrated into FileStore and not sperarated * LockingStore and the entire package has been removed * MemoryLocker has been moved into its very own package --- datastore.go | 21 ++++ filestore/filelock.go | 65 ------------ filestore/filelock_test.go | 35 ------- filestore/filestore.go | 77 ++++++++++++--- filestore/filestore_test.go | 26 +++++ lockingstore/lockingstore.go | 99 ------------------- lockingstore/lockingstore_test.go | 82 --------------- lockingstore/memorylocker_test.go | 28 ------ .../memorylocker.go | 20 +++- memorylocker/memorylocker_test.go | 50 ++++++++++ patch_test.go | 84 ++++++++++++++++ unrouted_handler.go | 37 +++++++ 12 files changed, 296 insertions(+), 328 deletions(-) delete mode 100644 filestore/filelock.go delete mode 100644 filestore/filelock_test.go delete mode 100644 lockingstore/lockingstore.go delete mode 100644 lockingstore/lockingstore_test.go delete mode 100644 lockingstore/memorylocker_test.go rename {lockingstore => memorylocker}/memorylocker.go (53%) create mode 100644 memorylocker/memorylocker_test.go diff --git a/datastore.go b/datastore.go index 8c335dc..c891c9f 100644 --- a/datastore.go +++ b/datastore.go @@ -55,3 +55,24 @@ type DataStore interface { // and writing, must return os.ErrNotExist or similar. Terminate(id string) error } + +// LockerDataStore is the interface required for custom lock persisting mechanisms. +// Common ways to store this information is in memory, on disk or using an +// external service, such as ZooKeeper. +// When multiple processes are attempting to access an upload, whether it be +// by reading or writing, a syncronization mechanism is required to prevent +// data corruption, especially to ensure correct offset values and the proper +// order of chunks inside a single upload. +type LockerDataStore interface { + DataStore + + // LockUpload attempts to obtain an exclusive lock for the upload specified + // by its id. + // If this operation fails because the resource is already locked, the + // tusd.ErrFileLocked must be returned. If no error is returned, the attempt + // is consider to be successful and the upload to be locked until UnlockUpload + // is invoked for the same upload. + LockUpload(id string) error + // UnlockUpload releases an existing lock for the given upload. + UnlockUpload(id string) error +} diff --git a/filestore/filelock.go b/filestore/filelock.go deleted file mode 100644 index 8022820..0000000 --- a/filestore/filelock.go +++ /dev/null @@ -1,65 +0,0 @@ -package filestore - -import ( - "os" - "path/filepath" - - "github.com/nightlyone/lockfile" - "github.com/tus/tusd" -) - -// FileLocker provides an exclusive upload locking mechansim using lock files -// which are stored on disk. Each of them stores the PID of the process which -// aquired the lock. This allows locks to be automatically freed when a process -// is unable to release it on its own because the process is not alive anymore. -// -// Please consult the package lockingstore for instructions on how to use this -// locker. -type FileLocker struct { - // Relative or absolute path to store the locks in. - Path string -} - -func (locker FileLocker) LockUpload(id string) error { - lock, err := locker.newLock(id) - if err != nil { - return err - } - - err = lock.TryLock() - if err == lockfile.ErrBusy { - return tusd.ErrFileLocked - } - - return err -} - -func (locker FileLocker) UnlockUpload(id string) error { - lock, err := locker.newLock(id) - if err != nil { - return err - } - - err = lock.Unlock() - - // A "no such file or directory" will be returned if no lockfile was found. - // Since this means that the file has never been locked, we drop the error - // and continue as if nothing happend. - if os.IsNotExist(err) { - err = nil - } - - return nil -} - -func (locker FileLocker) newLock(id string) (lockfile.Lockfile, error) { - path, err := filepath.Abs(locker.Path + "/" + id + ".lock") - if err != nil { - return lockfile.Lockfile(""), err - } - - // We use Lockfile directly instead of lockfile.New to bypass the unnecessary - // check whether the provided path is absolute since we just resolved it - // on our own. - return lockfile.Lockfile(path), nil -} diff --git a/filestore/filelock_test.go b/filestore/filelock_test.go deleted file mode 100644 index 71c5c97..0000000 --- a/filestore/filelock_test.go +++ /dev/null @@ -1,35 +0,0 @@ -package filestore - -import ( - "io/ioutil" - "testing" - - "github.com/tus/tusd" - "github.com/tus/tusd/lockingstore" -) - -func TestFileLocker(t *testing.T) { - dir, err := ioutil.TempDir("", "tusd-file-locker") - if err != nil { - t.Fatal(err) - } - - var locker lockingstore.Locker - locker = FileLocker{dir} - - if err := locker.LockUpload("one"); err != nil { - t.Errorf("unexpected error when locking file: %s", err) - } - - if err := locker.LockUpload("one"); err != tusd.ErrFileLocked { - t.Errorf("expected error when locking locked file: %s", err) - } - - if err := locker.UnlockUpload("one"); err != nil { - t.Errorf("unexpected error when unlocking file: %s", err) - } - - if err := locker.UnlockUpload("one"); err != nil { - t.Errorf("unexpected error when unlocking file again: %s", err) - } -} diff --git a/filestore/filestore.go b/filestore/filestore.go index 56e52e4..268c28c 100644 --- a/filestore/filestore.go +++ b/filestore/filestore.go @@ -6,6 +6,13 @@ // `[id].bin` files contain the raw binary data uploaded. // No cleanup is performed so you may want to run a cronjob to ensure your disk // is not filled up with old and finished uploads. +// +// In addition, it provides an exclusive upload locking mechansim using lock files +// which are stored on disk. Each of them stores the PID of the process which +// aquired the lock. This allows locks to be automatically freed when a process +// is unable to release it on its own because the process is not alive anymore. +// For more information, consult the documentation for tusd.LockerDataStore +// interface, which is implemented by FileStore package filestore import ( @@ -13,10 +20,12 @@ import ( "io" "io/ioutil" "os" + "path/filepath" "github.com/tus/tusd" - "github.com/tus/tusd/lockingstore" "github.com/tus/tusd/uid" + + "github.com/nightlyone/lockfile" ) var defaultFilePerm = os.FileMode(0775) @@ -32,16 +41,9 @@ type FileStore struct { // New creates a new file based storage backend. The directory specified will // be used as the only storage entry. This method does not check // whether the path exists, use os.MkdirAll to ensure. -// In addition, a locking mechanism is provided using lockingstore.LockingStore -// and FileLocker. -func New(path string) tusd.DataStore { - store := FileStore{path} - locker := FileLocker{path} - - return lockingstore.LockingStore{ - DataStore: &store, - Locker: &locker, - } +// In addition, a locking mechanism is provided. +func New(path string) FileStore { + return FileStore{path} } func (store FileStore) NewUpload(info tusd.FileInfo) (id string, err error) { @@ -100,17 +102,62 @@ func (store FileStore) Terminate(id string) error { return nil } -// Return the path to the .bin storing the binary data +func (store FileStore) LockUpload(id string) error { + lock, err := store.newLock(id) + if err != nil { + return err + } + + err = lock.TryLock() + if err == lockfile.ErrBusy { + return tusd.ErrFileLocked + } + + return err +} + +func (store FileStore) UnlockUpload(id string) error { + lock, err := store.newLock(id) + if err != nil { + return err + } + + err = lock.Unlock() + + // A "no such file or directory" will be returned if no lockfile was found. + // Since this means that the file has never been locked, we drop the error + // and continue as if nothing happend. + if os.IsNotExist(err) { + err = nil + } + + return nil +} + +// newLock contructs a new Lockfile instance. +func (store FileStore) newLock(id string) (lockfile.Lockfile, error) { + path, err := filepath.Abs(store.Path + "/" + id + ".lock") + if err != nil { + return lockfile.Lockfile(""), err + } + + // We use Lockfile directly instead of lockfile.New to bypass the unnecessary + // check whether the provided path is absolute since we just resolved it + // on our own. + return lockfile.Lockfile(path), nil +} + +// binPath returns the path to the .bin storing the binary data. func (store FileStore) binPath(id string) string { return store.Path + "/" + id + ".bin" } -// Return the path to the .info file storing the file's info +// infoPath returns the path to the .info file storing the file's info. func (store FileStore) infoPath(id string) string { return store.Path + "/" + id + ".info" } -// Update the entire information. Everything will be overwritten. +// writeInfo updates the entire information. Everything will be overwritten. func (store FileStore) writeInfo(id string, info tusd.FileInfo) error { data, err := json.Marshal(info) if err != nil { @@ -119,7 +166,7 @@ func (store FileStore) writeInfo(id string, info tusd.FileInfo) error { return ioutil.WriteFile(store.infoPath(id), data, defaultFilePerm) } -// Update the .info file using the new upload. +// setOffset updates the .info file to match the new offset. func (store FileStore) setOffset(id string, offset int64) error { info, err := store.GetInfo(id) if err != nil { diff --git a/filestore/filestore_test.go b/filestore/filestore_test.go index 74c4dfc..06da9eb 100644 --- a/filestore/filestore_test.go +++ b/filestore/filestore_test.go @@ -95,3 +95,29 @@ func TestFilestore(t *testing.T) { t.Fatal("expected os.ErrIsNotExist") } } + +func TestFileLocker(t *testing.T) { + dir, err := ioutil.TempDir("", "tusd-file-locker") + if err != nil { + t.Fatal(err) + } + + var locker tusd.LockerDataStore + locker = FileStore{dir} + + if err := locker.LockUpload("one"); err != nil { + t.Errorf("unexpected error when locking file: %s", err) + } + + if err := locker.LockUpload("one"); err != tusd.ErrFileLocked { + t.Errorf("expected error when locking locked file: %s", err) + } + + if err := locker.UnlockUpload("one"); err != nil { + t.Errorf("unexpected error when unlocking file: %s", err) + } + + if err := locker.UnlockUpload("one"); err != nil { + t.Errorf("unexpected error when unlocking file again: %s", err) + } +} diff --git a/lockingstore/lockingstore.go b/lockingstore/lockingstore.go deleted file mode 100644 index 13019af..0000000 --- a/lockingstore/lockingstore.go +++ /dev/null @@ -1,99 +0,0 @@ -// Package lockingstore manages concurrent access to a single upload. -// -// When multiple processes are attempting to access an upload, whether it be -// by reading or writing, a syncronization mechanism is required to prevent -// data corruption, especially to ensure correct offset values and the proper -// order of chunks inside a single upload. -// -// This package wrappes an existing data storage and only allows a single access -// at a time by using an exclusive locking mechanism. -package lockingstore - -import ( - "io" - - "github.com/tus/tusd" -) - -// Locker is the interface required for custom lock persisting mechanisms. -// Common ways to store this information is in memory, on disk or using an -// external service, such as ZooKeeper. -type Locker interface { - // LockUpload attempts to obtain an exclusive lock for the upload specified - // by its id. - // If this operation fails because the resource is already locked, the - // tusd.ErrFileLocked must be returned. If no error is returned, the attempt - // is consider to be successful and the upload to be locked until UnlockUpload - // is invoked for the same upload. - LockUpload(id string) error - // UnlockUpload releases an existing lock for the given upload. - UnlockUpload(id string) error -} - -// LockingStore wraps an existing data storage and catches all operation. -// Before passing the method calls to the underlying backend, locks are required -// to be obtained. -type LockingStore struct { - // The underlying data storage to which the operation will be passed if an - // upload is not locked. - tusd.DataStore - // The custom locking persisting mechanism used for obtaining and releasing - // locks. - Locker Locker -} - -func (store LockingStore) WriteChunk(id string, offset int64, src io.Reader) (n int64, err error) { - if err := store.Locker.LockUpload(id); err != nil { - return 0, err - } - - defer func() { - if unlockErr := store.Locker.UnlockUpload(id); unlockErr != nil { - err = unlockErr - } - }() - - return store.DataStore.WriteChunk(id, offset, src) -} - -func (store LockingStore) GetInfo(id string) (info tusd.FileInfo, err error) { - if err := store.Locker.LockUpload(id); err != nil { - return info, err - } - - defer func() { - if unlockErr := store.Locker.UnlockUpload(id); unlockErr != nil { - err = unlockErr - } - }() - - return store.DataStore.GetInfo(id) -} - -func (store LockingStore) GetReader(id string) (src io.Reader, err error) { - if err := store.Locker.LockUpload(id); err != nil { - return nil, err - } - - defer func() { - if unlockErr := store.Locker.UnlockUpload(id); unlockErr != nil { - err = unlockErr - } - }() - - return store.DataStore.GetReader(id) -} - -func (store LockingStore) Terminate(id string) (err error) { - if err := store.Locker.LockUpload(id); err != nil { - return err - } - - defer func() { - if unlockErr := store.Locker.UnlockUpload(id); unlockErr != nil { - err = unlockErr - } - }() - - return store.DataStore.Terminate(id) -} diff --git a/lockingstore/lockingstore_test.go b/lockingstore/lockingstore_test.go deleted file mode 100644 index 4fd3abc..0000000 --- a/lockingstore/lockingstore_test.go +++ /dev/null @@ -1,82 +0,0 @@ -package lockingstore_test - -import ( - "io" - "testing" - - "github.com/tus/tusd" - . "github.com/tus/tusd/lockingstore" -) - -type store struct { - calls int -} - -func (store *store) NewUpload(info tusd.FileInfo) (string, error) { - return "", nil -} -func (store *store) WriteChunk(id string, offset int64, src io.Reader) (int64, error) { - store.calls += 1 - return 0, nil -} - -func (store *store) GetInfo(id string) (tusd.FileInfo, error) { - store.calls += 1 - return tusd.FileInfo{}, nil -} - -func (store *store) GetReader(id string) (io.Reader, error) { - store.calls += 1 - return nil, nil -} - -func (store *store) Terminate(id string) error { - store.calls += 1 - return nil -} - -type locker struct { - lockCalls int - unlockCalls int -} - -func (locker *locker) LockUpload(id string) error { - locker.lockCalls += 1 - if id == "no" { - return tusd.ErrFileLocked - } - return nil -} - -func (locker *locker) UnlockUpload(id string) error { - locker.unlockCalls += 1 - return nil -} - -func TestLockingStore(t *testing.T) { - locker := new(locker) - store := new(store) - lstore := LockingStore{ - DataStore: store, - Locker: locker, - } - - lstore.NewUpload(tusd.FileInfo{}) - lstore.WriteChunk("", 0, nil) - lstore.GetInfo("") - lstore.GetReader("") - lstore.Terminate("") - - lstore.WriteChunk("no", 0, nil) - lstore.GetInfo("no") - lstore.GetReader("no") - lstore.Terminate("no") - - if locker.lockCalls != 8 { - t.Error("expected 8 calls to LockUpload, but got %d", locker.lockCalls) - } - - if locker.unlockCalls != 4 { - t.Error("expected 8 calls to UnlockUpload, but got %d", locker.unlockCalls) - } -} diff --git a/lockingstore/memorylocker_test.go b/lockingstore/memorylocker_test.go deleted file mode 100644 index d14fc7b..0000000 --- a/lockingstore/memorylocker_test.go +++ /dev/null @@ -1,28 +0,0 @@ -package lockingstore - -import ( - "testing" - - "github.com/tus/tusd" -) - -func TestMemoryLocker(t *testing.T) { - var locker Locker - locker = NewMemoryLocker() - - if err := locker.LockUpload("one"); err != nil { - t.Errorf("unexpected error when locking file: %s", err) - } - - if err := locker.LockUpload("one"); err != tusd.ErrFileLocked { - t.Errorf("expected error when locking locked file: %s", err) - } - - if err := locker.UnlockUpload("one"); err != nil { - t.Errorf("unexpected error when unlocking file: %s", err) - } - - if err := locker.UnlockUpload("one"); err != nil { - t.Errorf("unexpected error when unlocking file again: %s", err) - } -} diff --git a/lockingstore/memorylocker.go b/memorylocker/memorylocker.go similarity index 53% rename from lockingstore/memorylocker.go rename to memorylocker/memorylocker.go index d15ecc0..a565a7e 100644 --- a/lockingstore/memorylocker.go +++ b/memorylocker/memorylocker.go @@ -1,4 +1,14 @@ -package lockingstore +// Package memorylocker provides an in-memory locking mechanism +// +// When multiple processes are attempting to access an upload, whether it be +// by reading or writing, a syncronization mechanism is required to prevent +// data corruption, especially to ensure correct offset values and the proper +// order of chunks inside a single upload. +// +// MemoryLocker persists locks using memory and therefore allowing a simple and +// cheap mechansim. Locks will only exist as long as this object is kept in +// reference and will be erased if the program exits. +package memorylocker import ( "github.com/tus/tusd" @@ -8,13 +18,15 @@ import ( // cheap mechansim. Locks will only exist as long as this object is kept in // reference and will be erased if the program exits. type MemoryLocker struct { + tusd.DataStore locks map[string]bool } -// New creates a new lock memory persistor. -func NewMemoryLocker() *MemoryLocker { +// New creates a new lock memory wrapper around the provided storage. +func NewMemoryLocker(store tusd.DataStore) *MemoryLocker { return &MemoryLocker{ - locks: make(map[string]bool), + DataStore: store, + locks: make(map[string]bool), } } diff --git a/memorylocker/memorylocker_test.go b/memorylocker/memorylocker_test.go new file mode 100644 index 0000000..6f832b8 --- /dev/null +++ b/memorylocker/memorylocker_test.go @@ -0,0 +1,50 @@ +package memorylocker + +import ( + "io" + "testing" + + "github.com/tus/tusd" +) + +type zeroStore struct{} + +func (store zeroStore) NewUpload(info tusd.FileInfo) (string, error) { + return "", nil +} +func (store zeroStore) WriteChunk(id string, offset int64, src io.Reader) (int64, error) { + return 0, nil +} + +func (store zeroStore) GetInfo(id string) (tusd.FileInfo, error) { + return tusd.FileInfo{}, nil +} + +func (store zeroStore) GetReader(id string) (io.Reader, error) { + return nil, tusd.ErrNotImplemented +} + +func (store zeroStore) Terminate(id string) error { + return tusd.ErrNotImplemented +} + +func TestMemoryLocker(t *testing.T) { + var locker tusd.LockerDataStore + locker = NewMemoryLocker(&zeroStore{}) + + if err := locker.LockUpload("one"); err != nil { + t.Errorf("unexpected error when locking file: %s", err) + } + + if err := locker.LockUpload("one"); err != tusd.ErrFileLocked { + t.Errorf("expected error when locking locked file: %s", err) + } + + if err := locker.UnlockUpload("one"); err != nil { + t.Errorf("unexpected error when unlocking file: %s", err) + } + + if err := locker.UnlockUpload("one"); err != nil { + t.Errorf("unexpected error when unlocking file again: %s", err) + } +} diff --git a/patch_test.go b/patch_test.go index 1777e4d..3f2c4ca 100644 --- a/patch_test.go +++ b/patch_test.go @@ -206,3 +206,87 @@ func TestPatchOverflow(t *testing.T) { Code: http.StatusNoContent, }).Run(handler, t) } + +const ( + LOCK = iota + INFO + WRITE + UNLOCK + END +) + +type lockingPatchStore struct { + zeroStore + callOrder chan int +} + +func (s lockingPatchStore) GetInfo(id string) (FileInfo, error) { + s.callOrder <- INFO + + return FileInfo{ + Offset: 0, + Size: 20, + }, nil +} + +func (s lockingPatchStore) WriteChunk(id string, offset int64, src io.Reader) (int64, error) { + s.callOrder <- WRITE + + return 5, nil +} + +func (s lockingPatchStore) LockUpload(id string) error { + s.callOrder <- LOCK + return nil +} + +func (s lockingPatchStore) UnlockUpload(id string) error { + s.callOrder <- UNLOCK + return nil +} + +func TestLockingPatch(t *testing.T) { + callOrder := make(chan int, 10) + + handler, _ := NewHandler(Config{ + DataStore: lockingPatchStore{ + callOrder: callOrder, + }, + }) + + (&httpTest{ + Name: "Uploading to locking store", + Method: "PATCH", + URL: "yes", + ReqHeader: map[string]string{ + "Tus-Resumable": "1.0.0", + "Content-Type": "application/offset+octet-stream", + "Upload-Offset": "0", + }, + ReqBody: strings.NewReader("hello"), + Code: http.StatusNoContent, + }).Run(handler, t) + + callOrder <- END + close(callOrder) + + if <-callOrder != LOCK { + t.Error("expected call to LockUpload") + } + + if <-callOrder != INFO { + t.Error("expected call to GetInfo") + } + + if <-callOrder != WRITE { + t.Error("expected call to WriteChunk") + } + + if <-callOrder != UNLOCK { + t.Error("expected call to UnlockUpload") + } + + if <-callOrder != END { + t.Error("expected no more calls to happen") + } +} diff --git a/unrouted_handler.go b/unrouted_handler.go index a17249b..409fb36 100644 --- a/unrouted_handler.go +++ b/unrouted_handler.go @@ -257,6 +257,16 @@ func (handler *UnroutedHandler) HeadFile(w http.ResponseWriter, r *http.Request) handler.sendError(w, r, err) return } + + if locker, ok := handler.dataStore.(LockerDataStore); ok { + if err := locker.LockUpload(id); err != nil { + handler.sendError(w, r, err) + return + } + + defer locker.UnlockUpload(id) + } + info, err := handler.dataStore.GetInfo(id) if err != nil { handler.sendError(w, r, err) @@ -308,6 +318,15 @@ func (handler *UnroutedHandler) PatchFile(w http.ResponseWriter, r *http.Request return } + if locker, ok := handler.dataStore.(LockerDataStore); ok { + if err := locker.LockUpload(id); err != nil { + handler.sendError(w, r, err) + return + } + + defer locker.UnlockUpload(id) + } + info, err := handler.dataStore.GetInfo(id) if err != nil { handler.sendError(w, r, err) @@ -370,6 +389,15 @@ func (handler *UnroutedHandler) GetFile(w http.ResponseWriter, r *http.Request) return } + if locker, ok := handler.dataStore.(LockerDataStore); ok { + if err := locker.LockUpload(id); err != nil { + handler.sendError(w, r, err) + return + } + + defer locker.UnlockUpload(id) + } + info, err := handler.dataStore.GetInfo(id) if err != nil { handler.sendError(w, r, err) @@ -407,6 +435,15 @@ func (handler *UnroutedHandler) DelFile(w http.ResponseWriter, r *http.Request) return } + if locker, ok := handler.dataStore.(LockerDataStore); ok { + if err := locker.LockUpload(id); err != nil { + handler.sendError(w, r, err) + return + } + + defer locker.UnlockUpload(id) + } + err = handler.dataStore.Terminate(id) if err != nil { handler.sendError(w, r, err)