refactor: change renter UploadObjectMultipart to use a db table like we do with s3 to track multipart uploads persistently and generically so we can resume later

This commit is contained in:
Derrick Hammer 2024-03-01 20:42:42 -05:00
parent aefe9efaaa
commit 454deeae21
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
2 changed files with 68 additions and 26 deletions

14
db/models/sia_upload.go Normal file
View File

@ -0,0 +1,14 @@
package models
import "gorm.io/gorm"
func init() {
registerModel(&SiaUpload{})
}
type SiaUpload struct {
gorm.Model
UploadID string `gorm:"unique;not null"`
Bucket string `gorm:"not null;index:idx_bucket_key"`
Key string `gorm:"not null;index:idx_bucket_key"`
}

View File

@ -10,6 +10,10 @@ import (
"strconv" "strconv"
"strings" "strings"
"git.lumeweb.com/LumeWeb/portal/db/models"
"gorm.io/gorm"
"git.lumeweb.com/LumeWeb/portal/config" "git.lumeweb.com/LumeWeb/portal/config"
"git.lumeweb.com/LumeWeb/portal/cron" "git.lumeweb.com/LumeWeb/portal/cron"
@ -31,6 +35,7 @@ type RenterServiceParams struct {
Config *config.Manager Config *config.Manager
Logger *zap.Logger Logger *zap.Logger
Cron *cron.CronServiceDefault Cron *cron.CronServiceDefault
Db *gorm.DB
} }
type RenterDefault struct { type RenterDefault struct {
@ -39,15 +44,15 @@ type RenterDefault struct {
config *config.Manager config *config.Manager
logger *zap.Logger logger *zap.Logger
cron *cron.CronServiceDefault cron *cron.CronServiceDefault
db *gorm.DB
} }
type MultiPartUploadParams struct { type MultiPartUploadParams struct {
ReaderFactory ReaderFactory ReaderFactory ReaderFactory
Bucket string Bucket string
FileName string FileName string
Size uint64 Size uint64
ExistingUploadID string UploadIDHandler UploadIDHandler
UploadIDHandler UploadIDHandler
} }
var Module = fx.Module("renter", var Module = fx.Module("renter",
@ -64,6 +69,7 @@ func NewRenterService(params RenterServiceParams) *RenterDefault {
config: params.Config, config: params.Config,
logger: params.Logger, logger: params.Logger,
cron: params.Cron, cron: params.Cron,
db: params.Db,
} }
} }
@ -159,36 +165,54 @@ func (r *RenterDefault) UploadObjectMultipart(ctx context.Context, params *Multi
var uploadId string var uploadId string
start := uint64(0) start := uint64(0)
if params.ExistingUploadID != "" { var siaUpload models.SiaUpload
existing, err := r.busClient.MultipartUploadParts(ctx, bucket, fileName, params.ExistingUploadID, 0, 0)
if err != nil { siaUpload.Bucket = bucket
return err siaUpload.Key = fileName
}
uploadId = params.ExistingUploadID ret := r.db.WithContext(ctx).Model(&siaUpload).First(&siaUpload)
if ret.Error != nil {
for _, part := range existing.Parts { if !errors.Is(ret.Error, gorm.ErrRecordNotFound) {
if uint64(part.Size) != slabSize { return ret.Error
break
}
partNumber := part.PartNumber
uploadParts[partNumber-1] = api.MultipartCompletedPart{
PartNumber: part.PartNumber,
ETag: part.ETag,
}
}
if len(uploadParts) > 0 {
start = uint64(len(uploadParts)) - 1
} }
} else { } else {
uploadId = siaUpload.UploadID
}
if len(uploadId) > 0 {
existing, err := r.busClient.MultipartUploadParts(ctx, bucket, fileName, uploadId, 0, 0)
if err != nil {
uploadId = ""
} else {
for _, part := range existing.Parts {
if uint64(part.Size) != slabSize {
break
}
partNumber := part.PartNumber
uploadParts[partNumber-1] = api.MultipartCompletedPart{
PartNumber: part.PartNumber,
ETag: part.ETag,
}
}
if len(uploadParts) > 0 {
start = uint64(len(uploadParts)) - 1
}
}
}
if uploadId == "" {
upload, err := r.busClient.CreateMultipartUpload(ctx, bucket, fileName, api.CreateMultipartOptions{Key: object.NoOpKey}) upload, err := r.busClient.CreateMultipartUpload(ctx, bucket, fileName, api.CreateMultipartOptions{Key: object.NoOpKey})
if err != nil { if err != nil {
return err return err
} }
uploadId = upload.UploadID uploadId = upload.UploadID
if tx := r.db.WithContext(ctx).Model(&siaUpload).Save(&siaUpload); tx.Error != nil {
return tx.Error
}
} }
if idHandler != nil { if idHandler != nil {
@ -265,6 +289,10 @@ func (r *RenterDefault) UploadObjectMultipart(ctx context.Context, params *Multi
return err return err
} }
if tx := r.db.WithContext(ctx).Delete(&siaUpload); tx.Error != nil {
return tx.Error
}
return nil return nil
} }