diff --git a/renter/renter.go b/renter/renter.go index 766a55b..b87f48d 100644 --- a/renter/renter.go +++ b/renter/renter.go @@ -3,11 +3,9 @@ package renter import ( "context" "errors" - "fmt" "io" "math" "net/url" - "strconv" "strings" "git.lumeweb.com/LumeWeb/portal/db/models" @@ -17,7 +15,6 @@ import ( "git.lumeweb.com/LumeWeb/portal/config" "git.lumeweb.com/LumeWeb/portal/cron" - "github.com/google/uuid" rhpv2 "go.sia.tech/core/rhp/v2" "go.sia.tech/renterd/api" busClient "go.sia.tech/renterd/bus/client" @@ -179,7 +176,18 @@ func (r *RenterDefault) UploadObjectMultipart(ctx context.Context, params *Multi uploadId = siaUpload.UploadID } - if len(uploadId) > 0 { + if len(uploadId) == 0 { + upload, err := r.busClient.CreateMultipartUpload(ctx, bucket, fileName, api.CreateMultipartOptions{Key: object.NoOpKey}) + if err != nil { + return err + } + + uploadId = upload.UploadID + siaUpload.UploadID = uploadId + if tx := r.db.WithContext(ctx).Model(&siaUpload).Save(&siaUpload); tx.Error != nil { + return tx.Error + } + } else { // TODO: Switch to using https://github.com/SiaFoundation/renterd/pull/974 after renterd is moved to core/coreutils. We cannot update until then due to WIP work. existing, err := r.busClient.MultipartUploadParts(ctx, bucket, fileName, uploadId, 0, 0) @@ -201,89 +209,37 @@ func (r *RenterDefault) UploadObjectMultipart(ctx context.Context, params *Multi start = uint64(len(uploadParts)) - 1 } } - - } - - if uploadId == "" { - upload, err := r.busClient.CreateMultipartUpload(ctx, bucket, fileName, api.CreateMultipartOptions{Key: object.NoOpKey}) - if err != nil { - return err - } - - uploadId = upload.UploadID - siaUpload.UploadID = uploadId - if tx := r.db.WithContext(ctx).Model(&siaUpload).Save(&siaUpload); tx.Error != nil { - return tx.Error - } } if idHandler != nil { idHandler(uploadId) } - for i := start; i < parts; i++ { - start := i * slabSize + reader, err := rf(uint(start*slabSize), uint(0)) + if err != nil { + return err + } - end := start + slabSize - if end > size { - end = size + defer func(reader io.ReadCloser) { + err := reader.Close() + if err != nil { + r.logger.Error("error closing reader", zap.Error(err)) } - nextChan := make(chan string, 0) - errChan := make(chan error, 0) + }(reader) + for i := start; i < parts; i++ { + lr := io.LimitReader(reader, int64(slabSize)) partNumber := int(i + 1) - job := r.cron.RetryableJob(cron.RetryableJobParams{ - Name: fileName + "-part-" + strconv.FormatUint(i, 10), - Function: func() error { - reader, err := rf(uint(start), uint(end)) - defer func(reader io.ReadCloser) { - err := reader.Close() - if err != nil { - r.logger.Error("failed to close reader", zap.Error(err)) - } - }(reader) - - if err != nil { - return err - } - - ret, err := r.workerClient.UploadMultipartUploadPart(context.Background(), reader, bucket, fileName, uploadId, partNumber, api.UploadMultipartUploadPartOptions{}) - if err != nil { - return err - } - - nextChan <- ret.ETag - return nil - }, - Limit: 10, - Error: func(jobID uuid.UUID, jobName string, err error) { - if errors.Is(err, cron.ErrRetryLimitReached) { - r.logger.Error("failed to upload part", zap.String("jobName", jobName), zap.Error(err)) - errChan <- err - } - }, - }) - - _, err = r.cron.CreateJob(job) + ret, err := r.workerClient.UploadMultipartUploadPart(ctx, lr, bucket, fileName, uploadId, partNumber, api.UploadMultipartUploadPartOptions{}) if err != nil { - r.logger.Error("failed to create job", zap.Error(err)) return err } uploadParts[i] = api.MultipartCompletedPart{ PartNumber: partNumber, + ETag: ret.ETag, } - - select { - case err = <-errChan: - return fmt.Errorf("failed to upload part %d: %s", i, err.Error()) - case etag := <-nextChan: - uploadParts[i].ETag = etag - case <-ctx.Done(): - return ctx.Err() - } - } _, err = r.busClient.CompleteMultipartUpload(ctx, bucket, fileName, uploadId, uploadParts)