From 0679a7cc3b052eda5a5b0a21148e22bd7dce2c27 Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Thu, 28 Mar 2024 21:20:44 -0400 Subject: [PATCH] refactor: add a sync.Map, and lock parallel uploads in a mutex keyed by the hash and tus upload id, check if it already exists, and if so, abort. --- protocols/s5/tus.go | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/protocols/s5/tus.go b/protocols/s5/tus.go index ac76596..d4a013c 100644 --- a/protocols/s5/tus.go +++ b/protocols/s5/tus.go @@ -9,6 +9,7 @@ import ( "io" "log/slog" "strings" + "sync" "time" "git.lumeweb.com/LumeWeb/portal/api/middleware" @@ -55,6 +56,7 @@ type TusHandler struct { tusStore tusd.DataStore s3Client *s3.Client storageProtocol storage.StorageProtocol + uploadMutexMap sync.Map } type TusHandlerParams struct { @@ -492,7 +494,24 @@ func (t *TusHandler) worker() { continue } - _, err = t.CreateUpload(ctx, decodedHash.HashBytes(), info.Upload.ID, uploaderID, uploaderIP, t.storageProtocol.Name()) + mapKey := append(decodedHash.HashBytes(), []byte(info.Upload.ID)...) + if _, ok := t.uploadMutexMap.Load(mapKey); !ok { + t.uploadMutexMap.Store(mapKey, &sync.Mutex{}) + } + + mutex, _ := t.uploadMutexMap.Load(mapKey) + + mutex.(*sync.Mutex).Lock() + exists, _ := t.UploadExists(ctx, decodedHash.HashBytes()) + + if exists { + t.logger.Debug("Upload already exists", zap.String("hash", hex.EncodeToString(decodedHash.HashBytes()))) + mutex.(*sync.Mutex).Unlock() + continue + } + + _, err = t.CreateUpload(ctx, mapKey, info.Upload.ID, uploaderID, uploaderIP, t.storageProtocol.Name()) + mutex.(*sync.Mutex).Unlock() if err != nil { errorResponse.Body = "Could not create tus upload" info.Upload.StopUpload(errorResponse)