diff --git a/renter/renter.go b/renter/renter.go index ac416d7..cfa0ae8 100644 --- a/renter/renter.go +++ b/renter/renter.go @@ -3,20 +3,29 @@ package renter import ( "context" "errors" + "git.lumeweb.com/LumeWeb/portal/cron" "github.com/spf13/viper" + rhpv2 "go.sia.tech/core/rhp/v2" "go.sia.tech/renterd/api" busClient "go.sia.tech/renterd/bus/client" + "go.sia.tech/renterd/object" workerClient "go.sia.tech/renterd/worker/client" "go.uber.org/fx" "go.uber.org/zap" "io" + "math" "net/url" + "strconv" ) +type ReaderFactory func(start uint, end uint) (io.ReadCloser, error) +type UploadIDHandler func(uploadID string) + type RenterServiceParams struct { fx.In Config *viper.Viper Logger *zap.Logger + Cron *cron.CronServiceDefault } type RenterDefault struct { @@ -24,6 +33,16 @@ type RenterDefault struct { workerClient *workerClient.Client config *viper.Viper logger *zap.Logger + cron *cron.CronServiceDefault +} + +type MultiPartUploadParams struct { + ReaderFactory ReaderFactory + Bucket string + FileName string + Size uint64 + ExistingUploadID string + UploadIDHandler UploadIDHandler } var Module = fx.Module("renter", @@ -43,7 +62,6 @@ func NewRenterService(params RenterServiceParams) *RenterDefault { } func (r *RenterDefault) CreateBucketIfNotExists(bucket string) error { - _, err := r.busClient.Bucket(context.Background(), bucket) if err == nil { @@ -102,3 +120,87 @@ func (r *RenterDefault) init() error { func (r *RenterDefault) GetObject(ctx context.Context, protocol string, hash string, options api.DownloadObjectOptions) (*api.GetObjectResponse, error) { return r.workerClient.GetObject(ctx, protocol, hash, options) } + +func (r *RenterDefault) GetSetting(ctx context.Context, setting string, out any) error { + err := r.busClient.Setting(ctx, setting, out) + + if err != nil { + return err + } + + return nil +} + +func (r *RenterDefault) MultipartUpload(params MultiPartUploadParams) error { + size := params.Size + rf := params.ReaderFactory + bucket := params.Bucket + fileName := params.FileName + ctx := context.Background() + idHandler := params.UploadIDHandler + + var redundancy api.RedundancySettings + + err := r.GetSetting(ctx, "redundancy", redundancy) + if err != nil { + return err + } + + slabSize := uint64(redundancy.MinShards * rhpv2.SectorSize) + parts := uint64(math.Ceil(float64(size) / float64(slabSize))) + uploadParts := make([]api.MultipartCompletedPart, parts) + + upload, err := r.busClient.CreateMultipartUpload(ctx, bucket, fileName, api.CreateMultipartOptions{Key: object.NoOpKey}) + if err != nil { + return err + } + + if idHandler != nil { + idHandler(upload.UploadID) + } + + for i := uint64(0); i < parts; i++ { + start := i * slabSize + + end := start + slabSize + if end > size { + end = size + } + + reader, err := rf(uint(start), uint(end)) + next := make(chan struct{}, 0) + defer close(next) + job := r.cron.RetryableTask(cron.RetryableTaskParams{ + Name: fileName + "-part-" + strconv.FormatUint(i, 10), + Function: func() error { + _, err := r.workerClient.UploadMultipartUploadPart(context.Background(), reader, bucket, fileName, upload.UploadID, int(i), api.UploadMultipartUploadPartOptions{}) + if err != nil { + return err + } + + next <- struct{}{} + return nil + }, + Limit: 10, + }) + + _, err = r.cron.CreateJob(job) + if err != nil { + r.logger.Error("failed to create job", zap.Error(err)) + return err + } + + uploadParts[i] = api.MultipartCompletedPart{ + PartNumber: int(i), + } + + <-next + } + + _, err = r.busClient.CompleteMultipartUpload(ctx, bucket, fileName, upload.UploadID, uploadParts) + if err != nil { + return err + } + + return nil +} diff --git a/storage/storage.go b/storage/storage.go index 10f15e9..515c91d 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -469,13 +469,20 @@ func (s *StorageServiceDefault) tusUploadTask(upload *models.TusUpload) error { return err } - reader, err := tusUpload.GetReader(ctx) + readerHash, err := tusUpload.GetReader(ctx) if err != nil { s.logger.Error("Could not get tus file", zap.Error(err)) return err } - hash, byteCount, err := s.GetHash(reader) + defer func(reader io.ReadCloser) { + err := reader.Close() + if err != nil { + s.logger.Error("Could not close reader", zap.Error(err)) + } + }(readerHash) + + hash, byteCount, err := s.GetHash(readerHash) if err != nil { s.logger.Error("Could not compute hash", zap.Error(err)) @@ -494,15 +501,22 @@ func (s *StorageServiceDefault) tusUploadTask(upload *models.TusUpload) error { return err } - reader, err = tusUpload.GetReader(ctx) + readerMime, err := tusUpload.GetReader(ctx) if err != nil { s.logger.Error("Could not get tus file", zap.Error(err)) return err } + defer func(reader io.ReadCloser) { + err := reader.Close() + if err != nil { + s.logger.Error("Could not close reader", zap.Error(err)) + } + }(readerMime) + var mimeBuf [512]byte - _, err = reader.Read(mimeBuf[:]) + _, err = readerMime.Read(mimeBuf[:]) if err != nil { s.logger.Error("Could not read mime", zap.Error(err)) @@ -518,13 +532,28 @@ func (s *StorageServiceDefault) tusUploadTask(upload *models.TusUpload) error { return tx.Error } - reader, err = tusUpload.GetReader(ctx) + hashStr, err := encoding.NewMultihash(s.getPrefixedHash(hash)).ToBase64Url() + err = s.renter.CreateBucketIfNotExists(upload.Protocol) if err != nil { - s.logger.Error("Could not get tus file", zap.Error(err)) return err } - err = s.PutFile(reader, upload.Protocol, dbHash) + info, err := tusUpload.GetInfo(context.Background()) + if err != nil { + s.logger.Error("Could not get tus info", zap.Error(err)) + return err + } + + err = s.renter.MultipartUpload(renter.MultiPartUploadParams{ + ReaderFactory: func(start uint, end uint) (io.ReadCloser, error) { + rangeHeader := fmt.Sprintf("bytes=%d-%d", start, end) + ctx = context.WithValue(ctx, "range", rangeHeader) + return tusUpload.GetReader(ctx) + }, + Bucket: upload.Protocol, + FileName: hashStr, + Size: uint64(info.Size), + }) if err != nil { s.logger.Error("Could not upload file", zap.Error(err))