portal/protocols/s5/locker.go

134 lines
2.5 KiB
Go
Raw Permalink Normal View History

package s5
import (
"context"
"os"
"sync"
"time"
"git.lumeweb.com/LumeWeb/portal/db/models"
tusd "github.com/tus/tusd/v2/pkg/handler"
"go.uber.org/zap"
"gorm.io/gorm"
)
var (
_ tusd.Locker = (*MySQLLocker)(nil)
_ tusd.Lock = (*Lock)(nil)
)
type MySQLLocker struct {
AcquirerPollInterval time.Duration
HolderPollInterval time.Duration
db *gorm.DB
logger *zap.Logger
}
type Lock struct {
locker *MySQLLocker
id string
holderPollInterval time.Duration
acquirerPollInterval time.Duration
stopHolderPoll chan struct{}
lockRecord models.TusLock
once sync.Once
}
func NewMySQLLocker(db *gorm.DB, logger *zap.Logger) *MySQLLocker {
return &MySQLLocker{HolderPollInterval: 5 * time.Second, AcquirerPollInterval: 2 * time.Second, db: db, logger: logger}
}
func (l *Lock) released() error {
err := l.lockRecord.Released(l.locker.db)
if err != nil {
l.locker.logger.Error("Failed to release lock", zap.Error(err))
return err
}
return nil
}
func (l *Lock) Lock(ctx context.Context, requestUnlock func()) error {
db := l.locker.db
for {
err := l.lockRecord.TryLock(db, ctx)
if err == nil {
break
}
if err != models.ErrTusLockBusy {
return err
}
err = l.lockRecord.RequestRelease(db)
if err != nil {
return err
}
select {
case <-ctx.Done():
2024-01-20 16:06:50 +00:00
err := l.released()
if err != nil {
return err
}
// Context expired, so we return a timeout
return tusd.ErrLockTimeout
case <-time.After(l.acquirerPollInterval):
// Continue with the next attempt after a short delay
continue
}
}
go func() {
2024-01-20 16:16:54 +00:00
ticker := time.NewTicker(l.holderPollInterval)
defer ticker.Stop()
for {
select {
case <-l.stopHolderPoll:
return
2024-01-20 16:16:54 +00:00
case <-ticker.C:
requested, err := l.lockRecord.IsReleaseRequested(db)
2024-01-20 16:16:54 +00:00
if err != nil {
// Handle error
continue
}
if requested {
requestUnlock()
return
}
}
}
}()
return nil
}
func (l *Lock) Unlock() error {
l.once.Do(func() {
close(l.stopHolderPoll)
})
return l.lockRecord.Delete(l.locker.db)
}
func (m *MySQLLocker) NewLock(id string) (tusd.Lock, error) {
return &Lock{
locker: m,
id: id,
holderPollInterval: m.HolderPollInterval,
acquirerPollInterval: m.AcquirerPollInterval,
stopHolderPoll: make(chan struct{}),
lockRecord: models.TusLock{
LockId: id,
HolderPID: os.Getpid(),
AcquiredAt: time.Now(),
ExpiresAt: time.Now().Add(30 * time.Minute),
},
}, nil
}