Merge pull request #37 from tus/lockingstore

Make locking implementation flexible
This commit is contained in:
Marius 2015-12-26 21:36:40 +01:00
commit b80b018a80
18 changed files with 370 additions and 60 deletions

View File

@ -44,9 +44,7 @@ func main() {
} }
var store tusd.DataStore var store tusd.DataStore
store = filestore.FileStore{ store = filestore.New(dir)
Path: dir,
}
if storeSize > 0 { if storeSize > 0 {
store = limitedstore.New(storeSize, store) store = limitedstore.New(storeSize, store)

View File

@ -1,4 +1,4 @@
package tusd package tusd_test
import ( import (
"io" "io"
@ -7,6 +7,8 @@ import (
"reflect" "reflect"
"strings" "strings"
"testing" "testing"
. "github.com/tus/tusd"
) )
type concatPartialStore struct { type concatPartialStore struct {

View File

@ -1,8 +1,10 @@
package tusd package tusd_test
import ( import (
"net/http" "net/http"
"testing" "testing"
. "github.com/tus/tusd"
) )
func TestCORS(t *testing.T) { func TestCORS(t *testing.T) {

View File

@ -55,3 +55,24 @@ type DataStore interface {
// and writing, must return os.ErrNotExist or similar. // and writing, must return os.ErrNotExist or similar.
Terminate(id string) error Terminate(id string) error
} }
// LockerDataStore is the interface required for custom lock persisting mechanisms.
// Common ways to store this information is in memory, on disk or using an
// external service, such as ZooKeeper.
// When multiple processes are attempting to access an upload, whether it be
// by reading or writing, a syncronization mechanism is required to prevent
// data corruption, especially to ensure correct offset values and the proper
// order of chunks inside a single upload.
type LockerDataStore interface {
DataStore
// LockUpload attempts to obtain an exclusive lock for the upload specified
// by its id.
// If this operation fails because the resource is already locked, the
// tusd.ErrFileLocked must be returned. If no error is returned, the attempt
// is consider to be successful and the upload to be locked until UnlockUpload
// is invoked for the same upload.
LockUpload(id string) error
// UnlockUpload releases an existing lock for the given upload.
UnlockUpload(id string) error
}

View File

@ -1,9 +1,18 @@
// Package filestore provide a storage backend based on the local file system.
//
// FileStore is a storage backend used as a tusd.DataStore in tusd.NewHandler. // FileStore is a storage backend used as a tusd.DataStore in tusd.NewHandler.
// It stores the uploads in a directory specified in two different files: The // It stores the uploads in a directory specified in two different files: The
// `[id].info` files are used to store the fileinfo in JSON format. The // `[id].info` files are used to store the fileinfo in JSON format. The
// `[id].bin` files contain the raw binary data uploaded. // `[id].bin` files contain the raw binary data uploaded.
// No cleanup is performed so you may want to run a cronjob to ensure your disk // No cleanup is performed so you may want to run a cronjob to ensure your disk
// is not filled up with old and finished uploads. // is not filled up with old and finished uploads.
//
// In addition, it provides an exclusive upload locking mechansim using lock files
// which are stored on disk. Each of them stores the PID of the process which
// aquired the lock. This allows locks to be automatically freed when a process
// is unable to release it on its own because the process is not alive anymore.
// For more information, consult the documentation for tusd.LockerDataStore
// interface, which is implemented by FileStore
package filestore package filestore
import ( import (
@ -11,9 +20,12 @@ import (
"io" "io"
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath"
"github.com/tus/tusd" "github.com/tus/tusd"
"github.com/tus/tusd/uid" "github.com/tus/tusd/uid"
"github.com/nightlyone/lockfile"
) )
var defaultFilePerm = os.FileMode(0775) var defaultFilePerm = os.FileMode(0775)
@ -22,10 +34,18 @@ var defaultFilePerm = os.FileMode(0775)
// methods. // methods.
type FileStore struct { type FileStore struct {
// Relative or absolute path to store files in. FileStore does not check // Relative or absolute path to store files in. FileStore does not check
// whether the path exists, you os.MkdirAll in this case on your own. // whether the path exists, use os.MkdirAll in this case on your own.
Path string Path string
} }
// New creates a new file based storage backend. The directory specified will
// be used as the only storage entry. This method does not check
// whether the path exists, use os.MkdirAll to ensure.
// In addition, a locking mechanism is provided.
func New(path string) FileStore {
return FileStore{path}
}
func (store FileStore) NewUpload(info tusd.FileInfo) (id string, err error) { func (store FileStore) NewUpload(info tusd.FileInfo) (id string, err error) {
id = uid.Uid() id = uid.Uid()
info.ID = id info.ID = id
@ -82,17 +102,62 @@ func (store FileStore) Terminate(id string) error {
return nil return nil
} }
// Return the path to the .bin storing the binary data func (store FileStore) LockUpload(id string) error {
lock, err := store.newLock(id)
if err != nil {
return err
}
err = lock.TryLock()
if err == lockfile.ErrBusy {
return tusd.ErrFileLocked
}
return err
}
func (store FileStore) UnlockUpload(id string) error {
lock, err := store.newLock(id)
if err != nil {
return err
}
err = lock.Unlock()
// A "no such file or directory" will be returned if no lockfile was found.
// Since this means that the file has never been locked, we drop the error
// and continue as if nothing happend.
if os.IsNotExist(err) {
err = nil
}
return nil
}
// newLock contructs a new Lockfile instance.
func (store FileStore) newLock(id string) (lockfile.Lockfile, error) {
path, err := filepath.Abs(store.Path + "/" + id + ".lock")
if err != nil {
return lockfile.Lockfile(""), err
}
// We use Lockfile directly instead of lockfile.New to bypass the unnecessary
// check whether the provided path is absolute since we just resolved it
// on our own.
return lockfile.Lockfile(path), nil
}
// binPath returns the path to the .bin storing the binary data.
func (store FileStore) binPath(id string) string { func (store FileStore) binPath(id string) string {
return store.Path + "/" + id + ".bin" return store.Path + "/" + id + ".bin"
} }
// Return the path to the .info file storing the file's info // infoPath returns the path to the .info file storing the file's info.
func (store FileStore) infoPath(id string) string { func (store FileStore) infoPath(id string) string {
return store.Path + "/" + id + ".info" return store.Path + "/" + id + ".info"
} }
// Update the entire information. Everything will be overwritten. // writeInfo updates the entire information. Everything will be overwritten.
func (store FileStore) writeInfo(id string, info tusd.FileInfo) error { func (store FileStore) writeInfo(id string, info tusd.FileInfo) error {
data, err := json.Marshal(info) data, err := json.Marshal(info)
if err != nil { if err != nil {
@ -101,7 +166,7 @@ func (store FileStore) writeInfo(id string, info tusd.FileInfo) error {
return ioutil.WriteFile(store.infoPath(id), data, defaultFilePerm) return ioutil.WriteFile(store.infoPath(id), data, defaultFilePerm)
} }
// Update the .info file using the new upload. // setOffset updates the .info file to match the new offset.
func (store FileStore) setOffset(id string, offset int64) error { func (store FileStore) setOffset(id string, offset int64) error {
info, err := store.GetInfo(id) info, err := store.GetInfo(id)
if err != nil { if err != nil {

View File

@ -95,3 +95,29 @@ func TestFilestore(t *testing.T) {
t.Fatal("expected os.ErrIsNotExist") t.Fatal("expected os.ErrIsNotExist")
} }
} }
func TestFileLocker(t *testing.T) {
dir, err := ioutil.TempDir("", "tusd-file-locker")
if err != nil {
t.Fatal(err)
}
var locker tusd.LockerDataStore
locker = FileStore{dir}
if err := locker.LockUpload("one"); err != nil {
t.Errorf("unexpected error when locking file: %s", err)
}
if err := locker.LockUpload("one"); err != tusd.ErrFileLocked {
t.Errorf("expected error when locking locked file: %s", err)
}
if err := locker.UnlockUpload("one"); err != nil {
t.Errorf("unexpected error when unlocking file: %s", err)
}
if err := locker.UnlockUpload("one"); err != nil {
t.Errorf("unexpected error when unlocking file again: %s", err)
}
}

View File

@ -1,4 +1,4 @@
package tusd package tusd_test
import ( import (
"io" "io"
@ -6,6 +6,8 @@ import (
"os" "os"
"strings" "strings"
"testing" "testing"
. "github.com/tus/tusd"
) )
type getStore struct { type getStore struct {

View File

@ -1,3 +1,4 @@
// Package tusd provides ways to accept tusd calls using HTTP.
package tusd package tusd
import ( import (

View File

@ -1,4 +1,4 @@
package tusd package tusd_test
import ( import (
"io" "io"
@ -7,6 +7,8 @@ import (
"os" "os"
"strings" "strings"
"testing" "testing"
. "github.com/tus/tusd"
) )
type zeroStore struct{} type zeroStore struct{}

View File

@ -1,9 +1,11 @@
package tusd package tusd_test
import ( import (
"net/http" "net/http"
"os" "os"
"testing" "testing"
. "github.com/tus/tusd"
) )
type headStore struct { type headStore struct {

View File

@ -1,4 +1,6 @@
// Package limitedstore implements a simple wrapper around existing // Package limitedstore provides a storage with a limited space.
//
// This goal is achieved by using a simple wrapper around existing
// datastores (tusd.DataStore) while limiting the used storage size. // datastores (tusd.DataStore) while limiting the used storage size.
// It will start terminating existing uploads if not enough space is left in // It will start terminating existing uploads if not enough space is left in
// order to create a new upload. // order to create a new upload.

View File

@ -0,0 +1,53 @@
// Package memorylocker provides an in-memory locking mechanism
//
// When multiple processes are attempting to access an upload, whether it be
// by reading or writing, a syncronization mechanism is required to prevent
// data corruption, especially to ensure correct offset values and the proper
// order of chunks inside a single upload.
//
// MemoryLocker persists locks using memory and therefore allowing a simple and
// cheap mechansim. Locks will only exist as long as this object is kept in
// reference and will be erased if the program exits.
package memorylocker
import (
"github.com/tus/tusd"
)
// MemoryLocker persists locks using memory and therefore allowing a simple and
// cheap mechansim. Locks will only exist as long as this object is kept in
// reference and will be erased if the program exits.
type MemoryLocker struct {
tusd.DataStore
locks map[string]bool
}
// New creates a new lock memory wrapper around the provided storage.
func NewMemoryLocker(store tusd.DataStore) *MemoryLocker {
return &MemoryLocker{
DataStore: store,
locks: make(map[string]bool),
}
}
// LockUpload tries to obtain the exclusive lock.
func (locker *MemoryLocker) LockUpload(id string) error {
// Ensure file is not locked
if _, ok := locker.locks[id]; ok {
return tusd.ErrFileLocked
}
locker.locks[id] = true
return nil
}
// UnlockUpload releases a lock. If no such lock exists, no error will be returned.
func (locker *MemoryLocker) UnlockUpload(id string) error {
// Deleting a non-existing key does not end in unexpected errors or panic
// since this operation results in a no-op
delete(locker.locks, id)
return nil
}

View File

@ -0,0 +1,50 @@
package memorylocker
import (
"io"
"testing"
"github.com/tus/tusd"
)
type zeroStore struct{}
func (store zeroStore) NewUpload(info tusd.FileInfo) (string, error) {
return "", nil
}
func (store zeroStore) WriteChunk(id string, offset int64, src io.Reader) (int64, error) {
return 0, nil
}
func (store zeroStore) GetInfo(id string) (tusd.FileInfo, error) {
return tusd.FileInfo{}, nil
}
func (store zeroStore) GetReader(id string) (io.Reader, error) {
return nil, tusd.ErrNotImplemented
}
func (store zeroStore) Terminate(id string) error {
return tusd.ErrNotImplemented
}
func TestMemoryLocker(t *testing.T) {
var locker tusd.LockerDataStore
locker = NewMemoryLocker(&zeroStore{})
if err := locker.LockUpload("one"); err != nil {
t.Errorf("unexpected error when locking file: %s", err)
}
if err := locker.LockUpload("one"); err != tusd.ErrFileLocked {
t.Errorf("expected error when locking locked file: %s", err)
}
if err := locker.UnlockUpload("one"); err != nil {
t.Errorf("unexpected error when unlocking file: %s", err)
}
if err := locker.UnlockUpload("one"); err != nil {
t.Errorf("unexpected error when unlocking file again: %s", err)
}
}

View File

@ -1,8 +1,10 @@
package tusd package tusd_test
import ( import (
"net/http" "net/http"
"testing" "testing"
. "github.com/tus/tusd"
) )
func TestOptions(t *testing.T) { func TestOptions(t *testing.T) {

View File

@ -1,4 +1,4 @@
package tusd package tusd_test
import ( import (
"io" "io"
@ -7,6 +7,8 @@ import (
"os" "os"
"strings" "strings"
"testing" "testing"
. "github.com/tus/tusd"
) )
type patchStore struct { type patchStore struct {
@ -204,3 +206,87 @@ func TestPatchOverflow(t *testing.T) {
Code: http.StatusNoContent, Code: http.StatusNoContent,
}).Run(handler, t) }).Run(handler, t)
} }
const (
LOCK = iota
INFO
WRITE
UNLOCK
END
)
type lockingPatchStore struct {
zeroStore
callOrder chan int
}
func (s lockingPatchStore) GetInfo(id string) (FileInfo, error) {
s.callOrder <- INFO
return FileInfo{
Offset: 0,
Size: 20,
}, nil
}
func (s lockingPatchStore) WriteChunk(id string, offset int64, src io.Reader) (int64, error) {
s.callOrder <- WRITE
return 5, nil
}
func (s lockingPatchStore) LockUpload(id string) error {
s.callOrder <- LOCK
return nil
}
func (s lockingPatchStore) UnlockUpload(id string) error {
s.callOrder <- UNLOCK
return nil
}
func TestLockingPatch(t *testing.T) {
callOrder := make(chan int, 10)
handler, _ := NewHandler(Config{
DataStore: lockingPatchStore{
callOrder: callOrder,
},
})
(&httpTest{
Name: "Uploading to locking store",
Method: "PATCH",
URL: "yes",
ReqHeader: map[string]string{
"Tus-Resumable": "1.0.0",
"Content-Type": "application/offset+octet-stream",
"Upload-Offset": "0",
},
ReqBody: strings.NewReader("hello"),
Code: http.StatusNoContent,
}).Run(handler, t)
callOrder <- END
close(callOrder)
if <-callOrder != LOCK {
t.Error("expected call to LockUpload")
}
if <-callOrder != INFO {
t.Error("expected call to GetInfo")
}
if <-callOrder != WRITE {
t.Error("expected call to WriteChunk")
}
if <-callOrder != UNLOCK {
t.Error("expected call to UnlockUpload")
}
if <-callOrder != END {
t.Error("expected no more calls to happen")
}
}

View File

@ -1,8 +1,10 @@
package tusd package tusd_test
import ( import (
"net/http" "net/http"
"testing" "testing"
. "github.com/tus/tusd"
) )
type postStore struct { type postStore struct {

View File

@ -1,8 +1,10 @@
package tusd package tusd_test
import ( import (
"net/http" "net/http"
"testing" "testing"
. "github.com/tus/tusd"
) )
type terminateStore struct { type terminateStore struct {

View File

@ -75,7 +75,6 @@ type UnroutedHandler struct {
dataStore DataStore dataStore DataStore
isBasePathAbs bool isBasePathAbs bool
basePath string basePath string
locks map[string]bool
logger *log.Logger logger *log.Logger
// For each finished upload the corresponding info object will be sent using // For each finished upload the corresponding info object will be sent using
@ -114,7 +113,6 @@ func NewUnroutedHandler(config Config) (*UnroutedHandler, error) {
dataStore: config.DataStore, dataStore: config.DataStore,
basePath: base, basePath: base,
isBasePathAbs: uri.IsAbs(), isBasePathAbs: uri.IsAbs(),
locks: make(map[string]bool),
CompleteUploads: make(chan FileInfo), CompleteUploads: make(chan FileInfo),
logger: logger, logger: logger,
} }
@ -259,6 +257,16 @@ func (handler *UnroutedHandler) HeadFile(w http.ResponseWriter, r *http.Request)
handler.sendError(w, r, err) handler.sendError(w, r, err)
return return
} }
if locker, ok := handler.dataStore.(LockerDataStore); ok {
if err := locker.LockUpload(id); err != nil {
handler.sendError(w, r, err)
return
}
defer locker.UnlockUpload(id)
}
info, err := handler.dataStore.GetInfo(id) info, err := handler.dataStore.GetInfo(id)
if err != nil { if err != nil {
handler.sendError(w, r, err) handler.sendError(w, r, err)
@ -288,17 +296,16 @@ func (handler *UnroutedHandler) HeadFile(w http.ResponseWriter, r *http.Request)
w.WriteHeader(http.StatusNoContent) w.WriteHeader(http.StatusNoContent)
} }
// PatchFile adds a chunk to an upload. Only allowed if the upload is not // PatchFile adds a chunk to an upload. Only allowed enough space is left.
// locked and enough space is left.
func (handler *UnroutedHandler) PatchFile(w http.ResponseWriter, r *http.Request) { func (handler *UnroutedHandler) PatchFile(w http.ResponseWriter, r *http.Request) {
//Check for presence of application/offset+octet-stream // Check for presence of application/offset+octet-stream
if r.Header.Get("Content-Type") != "application/offset+octet-stream" { if r.Header.Get("Content-Type") != "application/offset+octet-stream" {
handler.sendError(w, r, ErrInvalidContentType) handler.sendError(w, r, ErrInvalidContentType)
return return
} }
//Check for presence of a valid Upload-Offset Header // Check for presence of a valid Upload-Offset Header
offset, err := strconv.ParseInt(r.Header.Get("Upload-Offset"), 10, 64) offset, err := strconv.ParseInt(r.Header.Get("Upload-Offset"), 10, 64)
if err != nil || offset < 0 { if err != nil || offset < 0 {
handler.sendError(w, r, ErrInvalidOffset) handler.sendError(w, r, ErrInvalidOffset)
@ -311,20 +318,15 @@ func (handler *UnroutedHandler) PatchFile(w http.ResponseWriter, r *http.Request
return return
} }
// Ensure file is not locked if locker, ok := handler.dataStore.(LockerDataStore); ok {
if _, ok := handler.locks[id]; ok { if err := locker.LockUpload(id); err != nil {
handler.sendError(w, r, ErrFileLocked) handler.sendError(w, r, err)
return return
}
defer locker.UnlockUpload(id)
} }
// Lock file for further writes (heads are allowed)
handler.locks[id] = true
// File will be unlocked regardless of an error or success
defer func() {
delete(handler.locks, id)
}()
info, err := handler.dataStore.GetInfo(id) info, err := handler.dataStore.GetInfo(id)
if err != nil { if err != nil {
handler.sendError(w, r, err) handler.sendError(w, r, err)
@ -387,20 +389,15 @@ func (handler *UnroutedHandler) GetFile(w http.ResponseWriter, r *http.Request)
return return
} }
// Ensure file is not locked if locker, ok := handler.dataStore.(LockerDataStore); ok {
if _, ok := handler.locks[id]; ok { if err := locker.LockUpload(id); err != nil {
handler.sendError(w, r, ErrFileLocked) handler.sendError(w, r, err)
return return
}
defer locker.UnlockUpload(id)
} }
// Lock file for further writes (heads are allowed)
handler.locks[id] = true
// File will be unlocked regardless of an error or success
defer func() {
delete(handler.locks, id)
}()
info, err := handler.dataStore.GetInfo(id) info, err := handler.dataStore.GetInfo(id)
if err != nil { if err != nil {
handler.sendError(w, r, err) handler.sendError(w, r, err)
@ -438,20 +435,15 @@ func (handler *UnroutedHandler) DelFile(w http.ResponseWriter, r *http.Request)
return return
} }
// Ensure file is not locked if locker, ok := handler.dataStore.(LockerDataStore); ok {
if _, ok := handler.locks[id]; ok { if err := locker.LockUpload(id); err != nil {
handler.sendError(w, r, ErrFileLocked) handler.sendError(w, r, err)
return return
}
defer locker.UnlockUpload(id)
} }
// Lock file for further writes (heads are allowed)
handler.locks[id] = true
// File will be unlocked regardless of an error or success
defer func() {
delete(handler.locks, id)
}()
err = handler.dataStore.Terminate(id) err = handler.dataStore.Terminate(id)
if err != nil { if err != nil {
handler.sendError(w, r, err) handler.sendError(w, r, err)