From 454deeae216f7d8c2c18b956504705b543a337aa Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Fri, 1 Mar 2024 20:42:42 -0500 Subject: [PATCH] refactor: change renter UploadObjectMultipart to use a db table like we do with s3 to track multipart uploads persistently and generically so we can resume later --- db/models/sia_upload.go | 14 ++++++++ renter/renter.go | 80 +++++++++++++++++++++++++++-------------- 2 files changed, 68 insertions(+), 26 deletions(-) create mode 100644 db/models/sia_upload.go diff --git a/db/models/sia_upload.go b/db/models/sia_upload.go new file mode 100644 index 0000000..089c5f8 --- /dev/null +++ b/db/models/sia_upload.go @@ -0,0 +1,14 @@ +package models + +import "gorm.io/gorm" + +func init() { + registerModel(&SiaUpload{}) +} + +type SiaUpload struct { + gorm.Model + UploadID string `gorm:"unique;not null"` + Bucket string `gorm:"not null;index:idx_bucket_key"` + Key string `gorm:"not null;index:idx_bucket_key"` +} diff --git a/renter/renter.go b/renter/renter.go index 356bf2e..26da021 100644 --- a/renter/renter.go +++ b/renter/renter.go @@ -10,6 +10,10 @@ import ( "strconv" "strings" + "git.lumeweb.com/LumeWeb/portal/db/models" + + "gorm.io/gorm" + "git.lumeweb.com/LumeWeb/portal/config" "git.lumeweb.com/LumeWeb/portal/cron" @@ -31,6 +35,7 @@ type RenterServiceParams struct { Config *config.Manager Logger *zap.Logger Cron *cron.CronServiceDefault + Db *gorm.DB } type RenterDefault struct { @@ -39,15 +44,15 @@ type RenterDefault struct { config *config.Manager logger *zap.Logger cron *cron.CronServiceDefault + db *gorm.DB } type MultiPartUploadParams struct { - ReaderFactory ReaderFactory - Bucket string - FileName string - Size uint64 - ExistingUploadID string - UploadIDHandler UploadIDHandler + ReaderFactory ReaderFactory + Bucket string + FileName string + Size uint64 + UploadIDHandler UploadIDHandler } var Module = fx.Module("renter", @@ -64,6 +69,7 @@ func NewRenterService(params RenterServiceParams) *RenterDefault { config: params.Config, logger: params.Logger, cron: params.Cron, + db: params.Db, } } @@ -159,36 +165,54 @@ func (r *RenterDefault) UploadObjectMultipart(ctx context.Context, params *Multi var uploadId string start := uint64(0) - if params.ExistingUploadID != "" { - existing, err := r.busClient.MultipartUploadParts(ctx, bucket, fileName, params.ExistingUploadID, 0, 0) + var siaUpload models.SiaUpload - if err != nil { - return err - } + siaUpload.Bucket = bucket + siaUpload.Key = fileName - uploadId = params.ExistingUploadID - - for _, part := range existing.Parts { - if uint64(part.Size) != slabSize { - break - } - partNumber := part.PartNumber - uploadParts[partNumber-1] = api.MultipartCompletedPart{ - PartNumber: part.PartNumber, - ETag: part.ETag, - } - } - - if len(uploadParts) > 0 { - start = uint64(len(uploadParts)) - 1 + ret := r.db.WithContext(ctx).Model(&siaUpload).First(&siaUpload) + if ret.Error != nil { + if !errors.Is(ret.Error, gorm.ErrRecordNotFound) { + return ret.Error } } else { + uploadId = siaUpload.UploadID + } + + if len(uploadId) > 0 { + existing, err := r.busClient.MultipartUploadParts(ctx, bucket, fileName, uploadId, 0, 0) + + if err != nil { + uploadId = "" + } else { + for _, part := range existing.Parts { + if uint64(part.Size) != slabSize { + break + } + partNumber := part.PartNumber + uploadParts[partNumber-1] = api.MultipartCompletedPart{ + PartNumber: part.PartNumber, + ETag: part.ETag, + } + } + + if len(uploadParts) > 0 { + start = uint64(len(uploadParts)) - 1 + } + } + + } + + if uploadId == "" { upload, err := r.busClient.CreateMultipartUpload(ctx, bucket, fileName, api.CreateMultipartOptions{Key: object.NoOpKey}) if err != nil { return err } uploadId = upload.UploadID + if tx := r.db.WithContext(ctx).Model(&siaUpload).Save(&siaUpload); tx.Error != nil { + return tx.Error + } } if idHandler != nil { @@ -265,6 +289,10 @@ func (r *RenterDefault) UploadObjectMultipart(ctx context.Context, params *Multi return err } + if tx := r.db.WithContext(ctx).Delete(&siaUpload); tx.Error != nil { + return tx.Error + } + return nil }