refactor: add a sync.Map, and lock parallel uploads in a mutex keyed by the hash and tus upload id, check if it already exists, and if so, abort.

This commit is contained in:
Derrick Hammer 2024-03-28 21:20:44 -04:00
parent 0f1360c6df
commit 0679a7cc3b
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
1 changed files with 20 additions and 1 deletions

View File

@ -9,6 +9,7 @@ import (
"io"
"log/slog"
"strings"
"sync"
"time"
"git.lumeweb.com/LumeWeb/portal/api/middleware"
@ -55,6 +56,7 @@ type TusHandler struct {
tusStore tusd.DataStore
s3Client *s3.Client
storageProtocol storage.StorageProtocol
uploadMutexMap sync.Map
}
type TusHandlerParams struct {
@ -492,7 +494,24 @@ func (t *TusHandler) worker() {
continue
}
_, err = t.CreateUpload(ctx, decodedHash.HashBytes(), info.Upload.ID, uploaderID, uploaderIP, t.storageProtocol.Name())
mapKey := append(decodedHash.HashBytes(), []byte(info.Upload.ID)...)
if _, ok := t.uploadMutexMap.Load(mapKey); !ok {
t.uploadMutexMap.Store(mapKey, &sync.Mutex{})
}
mutex, _ := t.uploadMutexMap.Load(mapKey)
mutex.(*sync.Mutex).Lock()
exists, _ := t.UploadExists(ctx, decodedHash.HashBytes())
if exists {
t.logger.Debug("Upload already exists", zap.String("hash", hex.EncodeToString(decodedHash.HashBytes())))
mutex.(*sync.Mutex).Unlock()
continue
}
_, err = t.CreateUpload(ctx, mapKey, info.Upload.ID, uploaderID, uploaderIP, t.storageProtocol.Name())
mutex.(*sync.Mutex).Unlock()
if err != nil {
errorResponse.Body = "Could not create tus upload"
info.Upload.StopUpload(errorResponse)