tusd/pkg/memorylocker/memorylocker.go

109 lines
2.7 KiB
Go

// Package memorylocker provides an in-memory locking mechanism.
//
// TODO: Update comment
// When multiple processes are attempting to access an upload, whether it be
// by reading or writing, a synchronization mechanism is required to prevent
// data corruption, especially to ensure correct offset values and the proper
// order of chunks inside a single upload.
//
// MemoryLocker persists locks using memory and therefore allowing a simple and
// cheap mechanism. Locks will only exist as long as this object is kept in
// reference and will be erased if the program exits.
package memorylocker
import (
"context"
"sync"
"github.com/tus/tusd/pkg/handler"
)
// MemoryLocker persists locks using memory and therefore allowing a simple and
// cheap mechanism. Locks will only exist as long as this object is kept in
// reference and will be erased if the program exits.
type MemoryLocker struct {
locks map[string]lockEntry
mutex sync.RWMutex
}
type lockEntry struct {
lockReleased chan struct{}
requestRelease func()
}
// New creates a new in-memory locker.
func New() *MemoryLocker {
return &MemoryLocker{
locks: make(map[string]lockEntry),
}
}
// UseIn adds this locker to the passed composer.
func (locker *MemoryLocker) UseIn(composer *handler.StoreComposer) {
composer.UseLocker(locker)
}
func (locker *MemoryLocker) NewLock(id string) (handler.Lock, error) {
return memoryLock{locker, id}, nil
}
type memoryLock struct {
locker *MemoryLocker
id string
}
// Lock tries to obtain the exclusive lock.
func (lock memoryLock) Lock(ctx context.Context, requestRelease func()) error {
lock.locker.mutex.RLock()
entry, ok := lock.locker.locks[lock.id]
lock.locker.mutex.RUnlock()
requestRelease:
if ok {
// TODO: Make this channel?
// TODO: Should we ensure this is only called once?
entry.requestRelease()
select {
case <-ctx.Done():
return handler.ErrLockTimeout
case <-entry.lockReleased:
}
}
lock.locker.mutex.Lock()
// Check that the lock has not already been created in the meantime
entry, ok = lock.locker.locks[lock.id]
if ok {
// Lock has been created in the meantime, so we must wait again until it is free
lock.locker.mutex.Unlock()
goto requestRelease
}
// No lock exists, so we can create it
entry = lockEntry{
lockReleased: make(chan struct{}),
requestRelease: requestRelease,
}
lock.locker.locks[lock.id] = entry
lock.locker.mutex.Unlock()
return nil
}
// Unlock releases a lock. If no such lock exists, no error will be returned.
func (lock memoryLock) Unlock() error {
lock.locker.mutex.Lock()
lockReleased := lock.locker.locks[lock.id].lockReleased
// Delete the lock entry entirely
delete(lock.locker.locks, lock.id)
lock.locker.mutex.Unlock()
close(lockReleased)
return nil
}