From 49d7c2ff784345a9d65ae2f50173e74c6333a2e5 Mon Sep 17 00:00:00 2001 From: Marius Date: Wed, 9 Dec 2015 20:25:08 +0100 Subject: [PATCH] Separate locking into data store implementation The goal is to allow different locking mechanism to be used for different storages. For example, in-memory for very few uploads, a file locker in conjunction with the FileStore or an external service, such as ZooKeeper. --- lockingstore/lockingstore.go | 67 ++++++++++++++++++++++++++++++++++++ lockingstore/memorylocker.go | 35 +++++++++++++++++++ unrouted_handler.go | 51 ++------------------------- 3 files changed, 105 insertions(+), 48 deletions(-) create mode 100644 lockingstore/lockingstore.go create mode 100644 lockingstore/memorylocker.go diff --git a/lockingstore/lockingstore.go b/lockingstore/lockingstore.go new file mode 100644 index 0000000..6e5b1fa --- /dev/null +++ b/lockingstore/lockingstore.go @@ -0,0 +1,67 @@ +package lockingstore + +type Locker interface { + LockUpload(id string) error + UnlockUpload(id string) error +} + +type LockingStore struct { + tusd.DataStore + Locker *Locker +} + +func (store LockingStore) WriteChunk(id string, offset int64, src io.Reader) (int64, error) { + if err := store.LockUpload(id); err != nil { + return 0, err + } + + defer func() { + if unlockErr := store.UnlockUpload(id); unlockErr != nil { + err = unlockErr + } + }() + + return store.DataStore.WriteChunk(id, offset, src) +} + +func (store LockingStore) GetInfo(id string) (FileInfo, error) { + if err := store.LockUpload(id); err != nil { + return nil, err + } + + defer func() { + if unlockErr := store.UnlockUpload(id); unlockErr != nil { + err = unlockErr + } + }() + + return store.DataStore.GetInfo(id) +} + +func (store LockingStore) GetReader(id string) (io.Reader, error) { + if err := store.LockUpload(id); err != nil { + return nil, err + } + + defer func() { + if unlockErr := store.UnlockUpload(id); unlockErr != nil { + err = unlockErr + } + }() + + return store.DataStore.GetReader(id) +} + +func (store LockingStore) Terminate(id string) error { + if err := store.LockUpload(id); err != nil { + return err + } + + defer func() { + if unlockErr := store.UnlockUpload(id); unlockErr != nil { + err = unlockErr + } + }() + + return store.DataStore.Terminate(id) +} diff --git a/lockingstore/memorylocker.go b/lockingstore/memorylocker.go new file mode 100644 index 0000000..1dd6d22 --- /dev/null +++ b/lockingstore/memorylocker.go @@ -0,0 +1,35 @@ +package lockingstore + +import ( + "github.com/tus/tusd" +) + +type MemoryLocker struct { + locks map[string]bool +} + +func New() *MemoryLocker { + return &MemoryLocker{ + locks: make(map[string]bool), + } +} + +func (locker *MemoryLocker) LockUpload(id string) error { + + // Ensure file is not locked + if _, ok := locker.locks[id]; ok { + return tusd.ErrFileLocked + } + + handler.locks[id] = true + + return nil +} + +func (locker *MemoryLocker) UnlockUpload(id string) error { + // Deleting a non-existing key does not end in unexpected errors or panic + // since this operation results in a no-op + delete(locker.locks, id) + + return nil +} diff --git a/unrouted_handler.go b/unrouted_handler.go index a980752..a17249b 100644 --- a/unrouted_handler.go +++ b/unrouted_handler.go @@ -75,7 +75,6 @@ type UnroutedHandler struct { dataStore DataStore isBasePathAbs bool basePath string - locks map[string]bool logger *log.Logger // For each finished upload the corresponding info object will be sent using @@ -114,7 +113,6 @@ func NewUnroutedHandler(config Config) (*UnroutedHandler, error) { dataStore: config.DataStore, basePath: base, isBasePathAbs: uri.IsAbs(), - locks: make(map[string]bool), CompleteUploads: make(chan FileInfo), logger: logger, } @@ -288,17 +286,16 @@ func (handler *UnroutedHandler) HeadFile(w http.ResponseWriter, r *http.Request) w.WriteHeader(http.StatusNoContent) } -// PatchFile adds a chunk to an upload. Only allowed if the upload is not -// locked and enough space is left. +// PatchFile adds a chunk to an upload. Only allowed enough space is left. func (handler *UnroutedHandler) PatchFile(w http.ResponseWriter, r *http.Request) { - //Check for presence of application/offset+octet-stream + // Check for presence of application/offset+octet-stream if r.Header.Get("Content-Type") != "application/offset+octet-stream" { handler.sendError(w, r, ErrInvalidContentType) return } - //Check for presence of a valid Upload-Offset Header + // Check for presence of a valid Upload-Offset Header offset, err := strconv.ParseInt(r.Header.Get("Upload-Offset"), 10, 64) if err != nil || offset < 0 { handler.sendError(w, r, ErrInvalidOffset) @@ -311,20 +308,6 @@ func (handler *UnroutedHandler) PatchFile(w http.ResponseWriter, r *http.Request return } - // Ensure file is not locked - if _, ok := handler.locks[id]; ok { - handler.sendError(w, r, ErrFileLocked) - return - } - - // Lock file for further writes (heads are allowed) - handler.locks[id] = true - - // File will be unlocked regardless of an error or success - defer func() { - delete(handler.locks, id) - }() - info, err := handler.dataStore.GetInfo(id) if err != nil { handler.sendError(w, r, err) @@ -387,20 +370,6 @@ func (handler *UnroutedHandler) GetFile(w http.ResponseWriter, r *http.Request) return } - // Ensure file is not locked - if _, ok := handler.locks[id]; ok { - handler.sendError(w, r, ErrFileLocked) - return - } - - // Lock file for further writes (heads are allowed) - handler.locks[id] = true - - // File will be unlocked regardless of an error or success - defer func() { - delete(handler.locks, id) - }() - info, err := handler.dataStore.GetInfo(id) if err != nil { handler.sendError(w, r, err) @@ -438,20 +407,6 @@ func (handler *UnroutedHandler) DelFile(w http.ResponseWriter, r *http.Request) return } - // Ensure file is not locked - if _, ok := handler.locks[id]; ok { - handler.sendError(w, r, ErrFileLocked) - return - } - - // Lock file for further writes (heads are allowed) - handler.locks[id] = true - - // File will be unlocked regardless of an error or success - defer func() { - delete(handler.locks, id) - }() - err = handler.dataStore.Terminate(id) if err != nil { handler.sendError(w, r, err)