Separate locking into data store implementation
The goal is to allow different locking mechanism to be used for different storages. For example, in-memory for very few uploads, a file locker in conjunction with the FileStore or an external service, such as ZooKeeper.
This commit is contained in:
parent
608795b322
commit
49d7c2ff78
|
@ -0,0 +1,67 @@
|
|||
package lockingstore
|
||||
|
||||
type Locker interface {
|
||||
LockUpload(id string) error
|
||||
UnlockUpload(id string) error
|
||||
}
|
||||
|
||||
type LockingStore struct {
|
||||
tusd.DataStore
|
||||
Locker *Locker
|
||||
}
|
||||
|
||||
func (store LockingStore) WriteChunk(id string, offset int64, src io.Reader) (int64, error) {
|
||||
if err := store.LockUpload(id); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if unlockErr := store.UnlockUpload(id); unlockErr != nil {
|
||||
err = unlockErr
|
||||
}
|
||||
}()
|
||||
|
||||
return store.DataStore.WriteChunk(id, offset, src)
|
||||
}
|
||||
|
||||
func (store LockingStore) GetInfo(id string) (FileInfo, error) {
|
||||
if err := store.LockUpload(id); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if unlockErr := store.UnlockUpload(id); unlockErr != nil {
|
||||
err = unlockErr
|
||||
}
|
||||
}()
|
||||
|
||||
return store.DataStore.GetInfo(id)
|
||||
}
|
||||
|
||||
func (store LockingStore) GetReader(id string) (io.Reader, error) {
|
||||
if err := store.LockUpload(id); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if unlockErr := store.UnlockUpload(id); unlockErr != nil {
|
||||
err = unlockErr
|
||||
}
|
||||
}()
|
||||
|
||||
return store.DataStore.GetReader(id)
|
||||
}
|
||||
|
||||
func (store LockingStore) Terminate(id string) error {
|
||||
if err := store.LockUpload(id); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if unlockErr := store.UnlockUpload(id); unlockErr != nil {
|
||||
err = unlockErr
|
||||
}
|
||||
}()
|
||||
|
||||
return store.DataStore.Terminate(id)
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
package lockingstore
|
||||
|
||||
import (
|
||||
"github.com/tus/tusd"
|
||||
)
|
||||
|
||||
type MemoryLocker struct {
|
||||
locks map[string]bool
|
||||
}
|
||||
|
||||
func New() *MemoryLocker {
|
||||
return &MemoryLocker{
|
||||
locks: make(map[string]bool),
|
||||
}
|
||||
}
|
||||
|
||||
func (locker *MemoryLocker) LockUpload(id string) error {
|
||||
|
||||
// Ensure file is not locked
|
||||
if _, ok := locker.locks[id]; ok {
|
||||
return tusd.ErrFileLocked
|
||||
}
|
||||
|
||||
handler.locks[id] = true
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (locker *MemoryLocker) UnlockUpload(id string) error {
|
||||
// Deleting a non-existing key does not end in unexpected errors or panic
|
||||
// since this operation results in a no-op
|
||||
delete(locker.locks, id)
|
||||
|
||||
return nil
|
||||
}
|
|
@ -75,7 +75,6 @@ type UnroutedHandler struct {
|
|||
dataStore DataStore
|
||||
isBasePathAbs bool
|
||||
basePath string
|
||||
locks map[string]bool
|
||||
logger *log.Logger
|
||||
|
||||
// For each finished upload the corresponding info object will be sent using
|
||||
|
@ -114,7 +113,6 @@ func NewUnroutedHandler(config Config) (*UnroutedHandler, error) {
|
|||
dataStore: config.DataStore,
|
||||
basePath: base,
|
||||
isBasePathAbs: uri.IsAbs(),
|
||||
locks: make(map[string]bool),
|
||||
CompleteUploads: make(chan FileInfo),
|
||||
logger: logger,
|
||||
}
|
||||
|
@ -288,17 +286,16 @@ func (handler *UnroutedHandler) HeadFile(w http.ResponseWriter, r *http.Request)
|
|||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
// PatchFile adds a chunk to an upload. Only allowed if the upload is not
|
||||
// locked and enough space is left.
|
||||
// PatchFile adds a chunk to an upload. Only allowed enough space is left.
|
||||
func (handler *UnroutedHandler) PatchFile(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
//Check for presence of application/offset+octet-stream
|
||||
// Check for presence of application/offset+octet-stream
|
||||
if r.Header.Get("Content-Type") != "application/offset+octet-stream" {
|
||||
handler.sendError(w, r, ErrInvalidContentType)
|
||||
return
|
||||
}
|
||||
|
||||
//Check for presence of a valid Upload-Offset Header
|
||||
// Check for presence of a valid Upload-Offset Header
|
||||
offset, err := strconv.ParseInt(r.Header.Get("Upload-Offset"), 10, 64)
|
||||
if err != nil || offset < 0 {
|
||||
handler.sendError(w, r, ErrInvalidOffset)
|
||||
|
@ -311,20 +308,6 @@ func (handler *UnroutedHandler) PatchFile(w http.ResponseWriter, r *http.Request
|
|||
return
|
||||
}
|
||||
|
||||
// Ensure file is not locked
|
||||
if _, ok := handler.locks[id]; ok {
|
||||
handler.sendError(w, r, ErrFileLocked)
|
||||
return
|
||||
}
|
||||
|
||||
// Lock file for further writes (heads are allowed)
|
||||
handler.locks[id] = true
|
||||
|
||||
// File will be unlocked regardless of an error or success
|
||||
defer func() {
|
||||
delete(handler.locks, id)
|
||||
}()
|
||||
|
||||
info, err := handler.dataStore.GetInfo(id)
|
||||
if err != nil {
|
||||
handler.sendError(w, r, err)
|
||||
|
@ -387,20 +370,6 @@ func (handler *UnroutedHandler) GetFile(w http.ResponseWriter, r *http.Request)
|
|||
return
|
||||
}
|
||||
|
||||
// Ensure file is not locked
|
||||
if _, ok := handler.locks[id]; ok {
|
||||
handler.sendError(w, r, ErrFileLocked)
|
||||
return
|
||||
}
|
||||
|
||||
// Lock file for further writes (heads are allowed)
|
||||
handler.locks[id] = true
|
||||
|
||||
// File will be unlocked regardless of an error or success
|
||||
defer func() {
|
||||
delete(handler.locks, id)
|
||||
}()
|
||||
|
||||
info, err := handler.dataStore.GetInfo(id)
|
||||
if err != nil {
|
||||
handler.sendError(w, r, err)
|
||||
|
@ -438,20 +407,6 @@ func (handler *UnroutedHandler) DelFile(w http.ResponseWriter, r *http.Request)
|
|||
return
|
||||
}
|
||||
|
||||
// Ensure file is not locked
|
||||
if _, ok := handler.locks[id]; ok {
|
||||
handler.sendError(w, r, ErrFileLocked)
|
||||
return
|
||||
}
|
||||
|
||||
// Lock file for further writes (heads are allowed)
|
||||
handler.locks[id] = true
|
||||
|
||||
// File will be unlocked regardless of an error or success
|
||||
defer func() {
|
||||
delete(handler.locks, id)
|
||||
}()
|
||||
|
||||
err = handler.dataStore.Terminate(id)
|
||||
if err != nil {
|
||||
handler.sendError(w, r, err)
|
||||
|
|
Loading…
Reference in New Issue