fix: use lock on CompleteUploads

This commit is contained in:
Derrick Hammer 2024-03-28 22:18:45 -04:00
parent 9254dc20c8
commit 82532b01eb
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
1 changed files with 29 additions and 1 deletions

View File

@ -537,7 +537,35 @@ func (t *TusHandler) worker() {
if !(!info.Upload.SizeIsDeferred && info.Upload.Offset == info.Upload.Size) { if !(!info.Upload.SizeIsDeferred && info.Upload.Offset == info.Upload.Size) {
continue continue
} }
err := t.UploadCompleted(ctx, info.Upload.ID)
hash, ok := info.Upload.MetaData["hash"]
if !ok {
t.logger.Error("Missing hash in metadata")
continue
}
decodedHash, err := encoding.MultihashFromBase64Url(hash)
mapKey := append([]byte{}, decodedHash.HashBytes()...)
mapKey = append(mapKey, []byte(info.Upload.ID)...)
mapKeyStr := string(mapKey)
mutex, ok := t.uploadMutexMap.Load(mapKeyStr)
if !ok {
t.logger.Error("Could not find mutex for upload")
continue
}
mutex.(*sync.Mutex).Lock()
err = t.UploadCompleted(ctx, info.Upload.ID)
exists, _ := t.UploadExists(ctx, decodedHash.HashBytes())
if !exists {
mutex.(*sync.Mutex).Unlock()
if !info.Upload.IsFinal && !info.Upload.IsPartial {
t.logger.Error("Upload not found", zap.String("hash", hex.EncodeToString(decodedHash.HashBytes())))
}
continue
}
mutex.(*sync.Mutex).Unlock()
if err != nil { if err != nil {
t.logger.Error("Could not complete tus upload", zap.Error(err)) t.logger.Error("Could not complete tus upload", zap.Error(err))
continue continue