feat: implement initial version of multipart uploads

This commit is contained in:
Derrick Hammer 2024-02-01 02:03:04 -05:00
parent 95cfa393b4
commit 15b527933f
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
2 changed files with 139 additions and 8 deletions

View File

@ -3,20 +3,29 @@ package renter
import (
"context"
"errors"
"git.lumeweb.com/LumeWeb/portal/cron"
"github.com/spf13/viper"
rhpv2 "go.sia.tech/core/rhp/v2"
"go.sia.tech/renterd/api"
busClient "go.sia.tech/renterd/bus/client"
"go.sia.tech/renterd/object"
workerClient "go.sia.tech/renterd/worker/client"
"go.uber.org/fx"
"go.uber.org/zap"
"io"
"math"
"net/url"
"strconv"
)
type ReaderFactory func(start uint, end uint) (io.ReadCloser, error)
type UploadIDHandler func(uploadID string)
type RenterServiceParams struct {
fx.In
Config *viper.Viper
Logger *zap.Logger
Cron *cron.CronServiceDefault
}
type RenterDefault struct {
@ -24,6 +33,16 @@ type RenterDefault struct {
workerClient *workerClient.Client
config *viper.Viper
logger *zap.Logger
cron *cron.CronServiceDefault
}
type MultiPartUploadParams struct {
ReaderFactory ReaderFactory
Bucket string
FileName string
Size uint64
ExistingUploadID string
UploadIDHandler UploadIDHandler
}
var Module = fx.Module("renter",
@ -43,7 +62,6 @@ func NewRenterService(params RenterServiceParams) *RenterDefault {
}
func (r *RenterDefault) CreateBucketIfNotExists(bucket string) error {
_, err := r.busClient.Bucket(context.Background(), bucket)
if err == nil {
@ -102,3 +120,87 @@ func (r *RenterDefault) init() error {
func (r *RenterDefault) GetObject(ctx context.Context, protocol string, hash string, options api.DownloadObjectOptions) (*api.GetObjectResponse, error) {
return r.workerClient.GetObject(ctx, protocol, hash, options)
}
func (r *RenterDefault) GetSetting(ctx context.Context, setting string, out any) error {
err := r.busClient.Setting(ctx, setting, out)
if err != nil {
return err
}
return nil
}
func (r *RenterDefault) MultipartUpload(params MultiPartUploadParams) error {
size := params.Size
rf := params.ReaderFactory
bucket := params.Bucket
fileName := params.FileName
ctx := context.Background()
idHandler := params.UploadIDHandler
var redundancy api.RedundancySettings
err := r.GetSetting(ctx, "redundancy", redundancy)
if err != nil {
return err
}
slabSize := uint64(redundancy.MinShards * rhpv2.SectorSize)
parts := uint64(math.Ceil(float64(size) / float64(slabSize)))
uploadParts := make([]api.MultipartCompletedPart, parts)
upload, err := r.busClient.CreateMultipartUpload(ctx, bucket, fileName, api.CreateMultipartOptions{Key: object.NoOpKey})
if err != nil {
return err
}
if idHandler != nil {
idHandler(upload.UploadID)
}
for i := uint64(0); i < parts; i++ {
start := i * slabSize
end := start + slabSize
if end > size {
end = size
}
reader, err := rf(uint(start), uint(end))
next := make(chan struct{}, 0)
defer close(next)
job := r.cron.RetryableTask(cron.RetryableTaskParams{
Name: fileName + "-part-" + strconv.FormatUint(i, 10),
Function: func() error {
_, err := r.workerClient.UploadMultipartUploadPart(context.Background(), reader, bucket, fileName, upload.UploadID, int(i), api.UploadMultipartUploadPartOptions{})
if err != nil {
return err
}
next <- struct{}{}
return nil
},
Limit: 10,
})
_, err = r.cron.CreateJob(job)
if err != nil {
r.logger.Error("failed to create job", zap.Error(err))
return err
}
uploadParts[i] = api.MultipartCompletedPart{
PartNumber: int(i),
}
<-next
}
_, err = r.busClient.CompleteMultipartUpload(ctx, bucket, fileName, upload.UploadID, uploadParts)
if err != nil {
return err
}
return nil
}

View File

@ -469,13 +469,20 @@ func (s *StorageServiceDefault) tusUploadTask(upload *models.TusUpload) error {
return err
}
reader, err := tusUpload.GetReader(ctx)
readerHash, err := tusUpload.GetReader(ctx)
if err != nil {
s.logger.Error("Could not get tus file", zap.Error(err))
return err
}
hash, byteCount, err := s.GetHash(reader)
defer func(reader io.ReadCloser) {
err := reader.Close()
if err != nil {
s.logger.Error("Could not close reader", zap.Error(err))
}
}(readerHash)
hash, byteCount, err := s.GetHash(readerHash)
if err != nil {
s.logger.Error("Could not compute hash", zap.Error(err))
@ -494,15 +501,22 @@ func (s *StorageServiceDefault) tusUploadTask(upload *models.TusUpload) error {
return err
}
reader, err = tusUpload.GetReader(ctx)
readerMime, err := tusUpload.GetReader(ctx)
if err != nil {
s.logger.Error("Could not get tus file", zap.Error(err))
return err
}
defer func(reader io.ReadCloser) {
err := reader.Close()
if err != nil {
s.logger.Error("Could not close reader", zap.Error(err))
}
}(readerMime)
var mimeBuf [512]byte
_, err = reader.Read(mimeBuf[:])
_, err = readerMime.Read(mimeBuf[:])
if err != nil {
s.logger.Error("Could not read mime", zap.Error(err))
@ -518,13 +532,28 @@ func (s *StorageServiceDefault) tusUploadTask(upload *models.TusUpload) error {
return tx.Error
}
reader, err = tusUpload.GetReader(ctx)
hashStr, err := encoding.NewMultihash(s.getPrefixedHash(hash)).ToBase64Url()
err = s.renter.CreateBucketIfNotExists(upload.Protocol)
if err != nil {
s.logger.Error("Could not get tus file", zap.Error(err))
return err
}
err = s.PutFile(reader, upload.Protocol, dbHash)
info, err := tusUpload.GetInfo(context.Background())
if err != nil {
s.logger.Error("Could not get tus info", zap.Error(err))
return err
}
err = s.renter.MultipartUpload(renter.MultiPartUploadParams{
ReaderFactory: func(start uint, end uint) (io.ReadCloser, error) {
rangeHeader := fmt.Sprintf("bytes=%d-%d", start, end)
ctx = context.WithValue(ctx, "range", rangeHeader)
return tusUpload.GetReader(ctx)
},
Bucket: upload.Protocol,
FileName: hashStr,
Size: uint64(info.Size),
})
if err != nil {
s.logger.Error("Could not upload file", zap.Error(err))