298 lines
6.9 KiB
Go
298 lines
6.9 KiB
Go
package renter
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"io"
|
|
"math"
|
|
"net/url"
|
|
"strings"
|
|
|
|
"git.lumeweb.com/LumeWeb/portal/db/models"
|
|
|
|
"gorm.io/gorm"
|
|
|
|
"git.lumeweb.com/LumeWeb/portal/config"
|
|
|
|
"git.lumeweb.com/LumeWeb/portal/cron"
|
|
sia "github.com/LumeWeb/siacentral-api"
|
|
rhpv2 "go.sia.tech/core/rhp/v2"
|
|
"go.sia.tech/renterd/api"
|
|
busClient "go.sia.tech/renterd/bus/client"
|
|
"go.sia.tech/renterd/object"
|
|
workerClient "go.sia.tech/renterd/worker/client"
|
|
"go.uber.org/fx"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type ReaderFactory func(start uint, end uint) (io.ReadCloser, error)
|
|
type UploadIDHandler func(uploadID string)
|
|
|
|
type RenterServiceParams struct {
|
|
fx.In
|
|
Config *config.Manager
|
|
Logger *zap.Logger
|
|
Cron *cron.CronServiceDefault
|
|
Db *gorm.DB
|
|
}
|
|
|
|
type RenterDefault struct {
|
|
busClient *busClient.Client
|
|
workerClient *workerClient.Client
|
|
config *config.Manager
|
|
logger *zap.Logger
|
|
cron *cron.CronServiceDefault
|
|
db *gorm.DB
|
|
}
|
|
|
|
type MultiPartUploadParams struct {
|
|
ReaderFactory ReaderFactory
|
|
Bucket string
|
|
FileName string
|
|
Size uint64
|
|
UploadIDHandler UploadIDHandler
|
|
}
|
|
|
|
var Module = fx.Module("renter",
|
|
fx.Options(
|
|
fx.Provide(NewRenterService),
|
|
fx.Provide(sia.NewSiaClient),
|
|
fx.Provide(NewPriceTracker),
|
|
fx.Invoke(func(r *RenterDefault) error {
|
|
return r.init()
|
|
}),
|
|
fx.Invoke(func(lc fx.Lifecycle, r *PriceTracker) error {
|
|
lc.Append(fx.Hook{
|
|
OnStart: func(ctx context.Context) error {
|
|
return r.init()
|
|
|
|
},
|
|
})
|
|
return nil
|
|
}),
|
|
),
|
|
)
|
|
|
|
func NewRenterService(params RenterServiceParams) *RenterDefault {
|
|
return &RenterDefault{
|
|
config: params.Config,
|
|
logger: params.Logger,
|
|
cron: params.Cron,
|
|
db: params.Db,
|
|
}
|
|
}
|
|
|
|
func (r *RenterDefault) CreateBucketIfNotExists(bucket string) error {
|
|
_, err := r.busClient.Bucket(context.Background(), bucket)
|
|
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
|
|
if !errors.Is(err, api.ErrBucketNotFound) {
|
|
return err
|
|
}
|
|
|
|
err = r.busClient.CreateBucket(context.Background(), bucket, api.CreateBucketOptions{
|
|
Policy: api.BucketPolicy{
|
|
PublicReadAccess: false,
|
|
},
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *RenterDefault) UploadObject(ctx context.Context, file io.Reader, bucket string, fileName string) error {
|
|
fileName = "/" + strings.TrimLeft(fileName, "/")
|
|
_, err := r.workerClient.UploadObject(ctx, file, bucket, fileName, api.UploadObjectOptions{})
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *RenterDefault) init() error {
|
|
addr := r.config.Config().Core.Storage.Sia.URL
|
|
passwd := r.config.Config().Core.Storage.Sia.Key
|
|
|
|
addrURL, err := url.Parse(addr)
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
addrURL.Path = "/api/worker"
|
|
|
|
r.workerClient = workerClient.New(addrURL.String(), passwd)
|
|
|
|
addrURL.Path = "/api/bus"
|
|
|
|
r.busClient = busClient.New(addrURL.String(), passwd)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *RenterDefault) GetObject(ctx context.Context, bucket string, fileName string, options api.DownloadObjectOptions) (*api.GetObjectResponse, error) {
|
|
return r.workerClient.GetObject(ctx, bucket, fileName, options)
|
|
}
|
|
|
|
func (r *RenterDefault) GetSetting(ctx context.Context, setting string, out any) error {
|
|
err := r.busClient.Setting(ctx, setting, out)
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *RenterDefault) UploadObjectMultipart(ctx context.Context, params *MultiPartUploadParams) error {
|
|
size := params.Size
|
|
rf := params.ReaderFactory
|
|
bucket := params.Bucket
|
|
fileName := params.FileName
|
|
idHandler := params.UploadIDHandler
|
|
|
|
fileName = "/" + strings.TrimLeft(fileName, "/")
|
|
|
|
var redundancy api.RedundancySettings
|
|
|
|
err := r.GetSetting(ctx, "redundancy", &redundancy)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
slabSize := uint64(redundancy.MinShards * rhpv2.SectorSize)
|
|
parts := uint64(math.Ceil(float64(size) / float64(slabSize)))
|
|
uploadParts := make([]api.MultipartCompletedPart, parts)
|
|
|
|
var uploadId string
|
|
start := uint64(0)
|
|
|
|
var siaUpload models.SiaUpload
|
|
|
|
siaUpload.Bucket = bucket
|
|
siaUpload.Key = fileName
|
|
|
|
ret := r.db.WithContext(ctx).Model(&siaUpload).First(&siaUpload)
|
|
if ret.Error != nil {
|
|
if !errors.Is(ret.Error, gorm.ErrRecordNotFound) {
|
|
return ret.Error
|
|
}
|
|
} else {
|
|
uploadId = siaUpload.UploadID
|
|
}
|
|
|
|
if len(uploadId) == 0 {
|
|
upload, err := r.busClient.CreateMultipartUpload(ctx, bucket, fileName, api.CreateMultipartOptions{Key: object.NoOpKey})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
uploadId = upload.UploadID
|
|
siaUpload.UploadID = uploadId
|
|
if tx := r.db.WithContext(ctx).Model(&siaUpload).Save(&siaUpload); tx.Error != nil {
|
|
return tx.Error
|
|
}
|
|
} else {
|
|
// TODO: Switch to using https://github.com/SiaFoundation/renterd/pull/974 after renterd is moved to core/coreutils. We cannot update until then due to WIP work.
|
|
existing, err := r.busClient.MultipartUploadParts(ctx, bucket, fileName, uploadId, 0, 0)
|
|
|
|
if err != nil {
|
|
uploadId = ""
|
|
} else {
|
|
for _, part := range existing.Parts {
|
|
if uint64(part.Size) != slabSize {
|
|
break
|
|
}
|
|
partNumber := part.PartNumber
|
|
uploadParts[partNumber-1] = api.MultipartCompletedPart{
|
|
PartNumber: part.PartNumber,
|
|
ETag: part.ETag,
|
|
}
|
|
}
|
|
|
|
if len(uploadParts) > 0 {
|
|
start = uint64(len(uploadParts)) - 1
|
|
}
|
|
}
|
|
}
|
|
|
|
if idHandler != nil {
|
|
idHandler(uploadId)
|
|
}
|
|
|
|
reader, err := rf(uint(start*slabSize), uint(0))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
defer func(reader io.ReadCloser) {
|
|
err := reader.Close()
|
|
if err != nil {
|
|
r.logger.Error("error closing reader", zap.Error(err))
|
|
}
|
|
}(reader)
|
|
|
|
for i := start; i < parts; i++ {
|
|
lr := io.LimitReader(reader, int64(slabSize))
|
|
partNumber := int(i + 1)
|
|
|
|
ret, err := r.workerClient.UploadMultipartUploadPart(ctx, lr, bucket, fileName, uploadId, partNumber, api.UploadMultipartUploadPartOptions{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
uploadParts[i] = api.MultipartCompletedPart{
|
|
PartNumber: partNumber,
|
|
ETag: ret.ETag,
|
|
}
|
|
}
|
|
|
|
_, err = r.busClient.CompleteMultipartUpload(ctx, bucket, fileName, uploadId, uploadParts)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if tx := r.db.WithContext(ctx).Delete(&siaUpload); tx.Error != nil {
|
|
return tx.Error
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *RenterDefault) DeleteObject(ctx context.Context, bucket string, fileName string) error {
|
|
return r.workerClient.DeleteObject(ctx, bucket, fileName, api.DeleteObjectOptions{})
|
|
}
|
|
|
|
func (r *RenterDefault) UpdateGougingSettings(ctx context.Context, settings api.GougingSettings) error {
|
|
return r.busClient.UpdateSetting(ctx, api.SettingGouging, settings)
|
|
}
|
|
|
|
func (r *RenterDefault) GougingSettings(ctx context.Context) (api.GougingSettings, error) {
|
|
var settings api.GougingSettings
|
|
err := r.GetSetting(ctx, api.SettingGouging, &settings)
|
|
|
|
if err != nil {
|
|
return api.GougingSettings{}, err
|
|
}
|
|
|
|
return settings, nil
|
|
}
|
|
|
|
func (r *RenterDefault) RedundancySettings(ctx context.Context) (api.RedundancySettings, error) {
|
|
var settings api.RedundancySettings
|
|
err := r.GetSetting(ctx, api.SettingRedundancy, &settings)
|
|
|
|
if err != nil {
|
|
return api.RedundancySettings{}, err
|
|
}
|
|
|
|
return settings, nil
|
|
}
|