From 82532b01ebe13bdd30fa70e638e2259526549964 Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Thu, 28 Mar 2024 22:18:45 -0400 Subject: [PATCH] fix: use lock on CompleteUploads --- protocols/s5/tus.go | 30 +++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/protocols/s5/tus.go b/protocols/s5/tus.go index 766353a..a069751 100644 --- a/protocols/s5/tus.go +++ b/protocols/s5/tus.go @@ -537,7 +537,35 @@ func (t *TusHandler) worker() { if !(!info.Upload.SizeIsDeferred && info.Upload.Offset == info.Upload.Size) { continue } - err := t.UploadCompleted(ctx, info.Upload.ID) + + hash, ok := info.Upload.MetaData["hash"] + if !ok { + t.logger.Error("Missing hash in metadata") + continue + } + + decodedHash, err := encoding.MultihashFromBase64Url(hash) + + mapKey := append([]byte{}, decodedHash.HashBytes()...) + mapKey = append(mapKey, []byte(info.Upload.ID)...) + mapKeyStr := string(mapKey) + mutex, ok := t.uploadMutexMap.Load(mapKeyStr) + if !ok { + t.logger.Error("Could not find mutex for upload") + continue + } + + mutex.(*sync.Mutex).Lock() + err = t.UploadCompleted(ctx, info.Upload.ID) + exists, _ := t.UploadExists(ctx, decodedHash.HashBytes()) + if !exists { + mutex.(*sync.Mutex).Unlock() + if !info.Upload.IsFinal && !info.Upload.IsPartial { + t.logger.Error("Upload not found", zap.String("hash", hex.EncodeToString(decodedHash.HashBytes()))) + } + continue + } + mutex.(*sync.Mutex).Unlock() if err != nil { t.logger.Error("Could not complete tus upload", zap.Error(err)) continue