refactor: simplify sia upload to not try to parallel process as it corrupts data. We will come back to this optimization in the future.

This commit is contained in:
Derrick Hammer 2024-03-10 11:54:32 -04:00
parent 6c60dae743
commit 736dc8aa9d
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
1 changed files with 25 additions and 69 deletions

View File

@ -3,11 +3,9 @@ package renter
import (
"context"
"errors"
"fmt"
"io"
"math"
"net/url"
"strconv"
"strings"
"git.lumeweb.com/LumeWeb/portal/db/models"
@ -17,7 +15,6 @@ import (
"git.lumeweb.com/LumeWeb/portal/config"
"git.lumeweb.com/LumeWeb/portal/cron"
"github.com/google/uuid"
rhpv2 "go.sia.tech/core/rhp/v2"
"go.sia.tech/renterd/api"
busClient "go.sia.tech/renterd/bus/client"
@ -179,7 +176,18 @@ func (r *RenterDefault) UploadObjectMultipart(ctx context.Context, params *Multi
uploadId = siaUpload.UploadID
}
if len(uploadId) > 0 {
if len(uploadId) == 0 {
upload, err := r.busClient.CreateMultipartUpload(ctx, bucket, fileName, api.CreateMultipartOptions{Key: object.NoOpKey})
if err != nil {
return err
}
uploadId = upload.UploadID
siaUpload.UploadID = uploadId
if tx := r.db.WithContext(ctx).Model(&siaUpload).Save(&siaUpload); tx.Error != nil {
return tx.Error
}
} else {
// TODO: Switch to using https://github.com/SiaFoundation/renterd/pull/974 after renterd is moved to core/coreutils. We cannot update until then due to WIP work.
existing, err := r.busClient.MultipartUploadParts(ctx, bucket, fileName, uploadId, 0, 0)
@ -201,89 +209,37 @@ func (r *RenterDefault) UploadObjectMultipart(ctx context.Context, params *Multi
start = uint64(len(uploadParts)) - 1
}
}
}
if uploadId == "" {
upload, err := r.busClient.CreateMultipartUpload(ctx, bucket, fileName, api.CreateMultipartOptions{Key: object.NoOpKey})
if err != nil {
return err
}
uploadId = upload.UploadID
siaUpload.UploadID = uploadId
if tx := r.db.WithContext(ctx).Model(&siaUpload).Save(&siaUpload); tx.Error != nil {
return tx.Error
}
}
if idHandler != nil {
idHandler(uploadId)
}
for i := start; i < parts; i++ {
start := i * slabSize
end := start + slabSize
if end > size {
end = size
reader, err := rf(uint(start*slabSize), uint(0))
if err != nil {
return err
}
nextChan := make(chan string, 0)
errChan := make(chan error, 0)
partNumber := int(i + 1)
job := r.cron.RetryableJob(cron.RetryableJobParams{
Name: fileName + "-part-" + strconv.FormatUint(i, 10),
Function: func() error {
reader, err := rf(uint(start), uint(end))
defer func(reader io.ReadCloser) {
err := reader.Close()
if err != nil {
r.logger.Error("failed to close reader", zap.Error(err))
r.logger.Error("error closing reader", zap.Error(err))
}
}(reader)
if err != nil {
return err
}
for i := start; i < parts; i++ {
lr := io.LimitReader(reader, int64(slabSize))
partNumber := int(i + 1)
ret, err := r.workerClient.UploadMultipartUploadPart(context.Background(), reader, bucket, fileName, uploadId, partNumber, api.UploadMultipartUploadPartOptions{})
ret, err := r.workerClient.UploadMultipartUploadPart(ctx, lr, bucket, fileName, uploadId, partNumber, api.UploadMultipartUploadPartOptions{})
if err != nil {
return err
}
nextChan <- ret.ETag
return nil
},
Limit: 10,
Error: func(jobID uuid.UUID, jobName string, err error) {
if errors.Is(err, cron.ErrRetryLimitReached) {
r.logger.Error("failed to upload part", zap.String("jobName", jobName), zap.Error(err))
errChan <- err
}
},
})
_, err = r.cron.CreateJob(job)
if err != nil {
r.logger.Error("failed to create job", zap.Error(err))
return err
}
uploadParts[i] = api.MultipartCompletedPart{
PartNumber: partNumber,
ETag: ret.ETag,
}
select {
case err = <-errChan:
return fmt.Errorf("failed to upload part %d: %s", i, err.Error())
case etag := <-nextChan:
uploadParts[i].ETag = etag
case <-ctx.Done():
return ctx.Err()
}
}
_, err = r.busClient.CompleteMultipartUpload(ctx, bucket, fileName, uploadId, uploadParts)