2024-02-01 02:27:38 +00:00
package renter
import (
"context"
"errors"
2024-02-02 00:26:23 +00:00
"fmt"
2024-02-17 02:58:34 +00:00
"io"
"math"
"net/url"
"strconv"
2024-02-18 08:30:42 +00:00
"strings"
2024-02-17 02:58:34 +00:00
2024-03-02 01:42:42 +00:00
"git.lumeweb.com/LumeWeb/portal/db/models"
"gorm.io/gorm"
2024-02-22 08:23:06 +00:00
"git.lumeweb.com/LumeWeb/portal/config"
2024-02-01 07:03:04 +00:00
"git.lumeweb.com/LumeWeb/portal/cron"
2024-02-02 00:26:23 +00:00
"github.com/google/uuid"
2024-02-01 07:03:04 +00:00
rhpv2 "go.sia.tech/core/rhp/v2"
2024-02-01 02:27:38 +00:00
"go.sia.tech/renterd/api"
busClient "go.sia.tech/renterd/bus/client"
2024-02-01 07:03:04 +00:00
"go.sia.tech/renterd/object"
2024-02-01 02:27:38 +00:00
workerClient "go.sia.tech/renterd/worker/client"
"go.uber.org/fx"
"go.uber.org/zap"
)
2024-02-01 07:03:04 +00:00
type ReaderFactory func ( start uint , end uint ) ( io . ReadCloser , error )
type UploadIDHandler func ( uploadID string )
2024-02-01 02:27:38 +00:00
type RenterServiceParams struct {
fx . In
2024-02-22 08:23:06 +00:00
Config * config . Manager
2024-02-01 02:27:38 +00:00
Logger * zap . Logger
2024-02-01 07:03:04 +00:00
Cron * cron . CronServiceDefault
2024-03-02 01:42:42 +00:00
Db * gorm . DB
2024-02-01 02:27:38 +00:00
}
type RenterDefault struct {
busClient * busClient . Client
workerClient * workerClient . Client
2024-02-22 08:23:06 +00:00
config * config . Manager
2024-02-01 02:27:38 +00:00
logger * zap . Logger
2024-02-01 07:03:04 +00:00
cron * cron . CronServiceDefault
2024-03-02 01:42:42 +00:00
db * gorm . DB
2024-02-01 07:03:04 +00:00
}
type MultiPartUploadParams struct {
2024-03-02 01:42:42 +00:00
ReaderFactory ReaderFactory
Bucket string
FileName string
Size uint64
UploadIDHandler UploadIDHandler
2024-02-01 02:27:38 +00:00
}
var Module = fx . Module ( "renter" ,
fx . Options (
fx . Provide ( NewRenterService ) ,
fx . Invoke ( func ( r * RenterDefault ) error {
return r . init ( )
} ) ,
) ,
)
func NewRenterService ( params RenterServiceParams ) * RenterDefault {
return & RenterDefault {
config : params . Config ,
logger : params . Logger ,
2024-02-01 23:33:17 +00:00
cron : params . Cron ,
2024-03-02 01:42:42 +00:00
db : params . Db ,
2024-02-01 02:27:38 +00:00
}
}
func ( r * RenterDefault ) CreateBucketIfNotExists ( bucket string ) error {
_ , err := r . busClient . Bucket ( context . Background ( ) , bucket )
if err == nil {
return nil
}
2024-02-17 02:58:34 +00:00
if ! errors . Is ( err , api . ErrBucketNotFound ) {
return err
2024-02-01 02:27:38 +00:00
}
err = r . busClient . CreateBucket ( context . Background ( ) , bucket , api . CreateBucketOptions {
Policy : api . BucketPolicy {
PublicReadAccess : false ,
} ,
} )
if err != nil {
return err
}
return nil
}
2024-02-17 02:58:34 +00:00
func ( r * RenterDefault ) UploadObject ( ctx context . Context , file io . Reader , bucket string , fileName string ) error {
2024-02-18 08:30:42 +00:00
fileName = "/" + strings . TrimLeft ( fileName , "/" )
2024-02-17 02:58:34 +00:00
_ , err := r . workerClient . UploadObject ( ctx , file , bucket , fileName , api . UploadObjectOptions { } )
2024-02-01 02:27:38 +00:00
if err != nil {
return err
}
return nil
}
func ( r * RenterDefault ) init ( ) error {
2024-02-22 08:23:06 +00:00
addr := r . config . Config ( ) . Core . Sia . URL
passwd := r . config . Config ( ) . Core . Sia . Key
2024-02-01 02:27:38 +00:00
addrURL , err := url . Parse ( addr )
if err != nil {
return err
}
addrURL . Path = "/api/worker"
r . workerClient = workerClient . New ( addrURL . String ( ) , passwd )
addrURL . Path = "/api/bus"
r . busClient = busClient . New ( addrURL . String ( ) , passwd )
return nil
}
2024-02-17 02:58:34 +00:00
func ( r * RenterDefault ) GetObject ( ctx context . Context , bucket string , fileName string , options api . DownloadObjectOptions ) ( * api . GetObjectResponse , error ) {
return r . workerClient . GetObject ( ctx , bucket , fileName , options )
2024-02-01 02:27:38 +00:00
}
2024-02-01 07:03:04 +00:00
func ( r * RenterDefault ) GetSetting ( ctx context . Context , setting string , out any ) error {
err := r . busClient . Setting ( ctx , setting , out )
if err != nil {
return err
}
return nil
}
2024-02-17 02:58:34 +00:00
func ( r * RenterDefault ) UploadObjectMultipart ( ctx context . Context , params * MultiPartUploadParams ) error {
2024-02-01 07:03:04 +00:00
size := params . Size
rf := params . ReaderFactory
bucket := params . Bucket
fileName := params . FileName
idHandler := params . UploadIDHandler
2024-02-18 08:30:42 +00:00
fileName = "/" + strings . TrimLeft ( fileName , "/" )
2024-02-01 07:03:04 +00:00
var redundancy api . RedundancySettings
2024-02-01 23:25:32 +00:00
err := r . GetSetting ( ctx , "redundancy" , & redundancy )
2024-02-01 07:03:04 +00:00
if err != nil {
return err
}
slabSize := uint64 ( redundancy . MinShards * rhpv2 . SectorSize )
parts := uint64 ( math . Ceil ( float64 ( size ) / float64 ( slabSize ) ) )
uploadParts := make ( [ ] api . MultipartCompletedPart , parts )
2024-02-17 11:37:45 +00:00
var uploadId string
start := uint64 ( 0 )
2024-03-02 01:42:42 +00:00
var siaUpload models . SiaUpload
2024-02-17 11:37:45 +00:00
2024-03-02 01:42:42 +00:00
siaUpload . Bucket = bucket
siaUpload . Key = fileName
ret := r . db . WithContext ( ctx ) . Model ( & siaUpload ) . First ( & siaUpload )
if ret . Error != nil {
if ! errors . Is ( ret . Error , gorm . ErrRecordNotFound ) {
return ret . Error
2024-02-17 11:37:45 +00:00
}
2024-03-02 01:42:42 +00:00
} else {
uploadId = siaUpload . UploadID
}
2024-02-17 11:37:45 +00:00
2024-03-02 01:42:42 +00:00
if len ( uploadId ) > 0 {
2024-03-05 17:59:52 +00:00
// TODO: Switch to using https://github.com/SiaFoundation/renterd/pull/974 after renterd is moved to core/coreutils. We cannot update until then due to WIP work.
2024-03-02 01:42:42 +00:00
existing , err := r . busClient . MultipartUploadParts ( ctx , bucket , fileName , uploadId , 0 , 0 )
2024-02-17 11:37:45 +00:00
2024-03-02 01:42:42 +00:00
if err != nil {
uploadId = ""
} else {
for _ , part := range existing . Parts {
if uint64 ( part . Size ) != slabSize {
break
}
partNumber := part . PartNumber
uploadParts [ partNumber - 1 ] = api . MultipartCompletedPart {
PartNumber : part . PartNumber ,
ETag : part . ETag ,
}
2024-02-17 11:37:45 +00:00
}
2024-03-02 01:42:42 +00:00
if len ( uploadParts ) > 0 {
start = uint64 ( len ( uploadParts ) ) - 1
2024-02-17 11:37:45 +00:00
}
}
2024-03-02 01:42:42 +00:00
}
if uploadId == "" {
2024-02-17 11:37:45 +00:00
upload , err := r . busClient . CreateMultipartUpload ( ctx , bucket , fileName , api . CreateMultipartOptions { Key : object . NoOpKey } )
if err != nil {
return err
}
uploadId = upload . UploadID
2024-03-09 19:45:31 +00:00
siaUpload . UploadID = uploadId
2024-03-02 01:42:42 +00:00
if tx := r . db . WithContext ( ctx ) . Model ( & siaUpload ) . Save ( & siaUpload ) ; tx . Error != nil {
return tx . Error
}
2024-02-01 07:03:04 +00:00
}
if idHandler != nil {
2024-02-17 11:37:45 +00:00
idHandler ( uploadId )
2024-02-01 07:03:04 +00:00
}
2024-02-17 11:37:45 +00:00
for i := start ; i < parts ; i ++ {
2024-02-01 07:03:04 +00:00
start := i * slabSize
end := start + slabSize
if end > size {
end = size
}
2024-02-02 02:14:44 +00:00
nextChan := make ( chan string , 0 )
2024-02-02 00:26:23 +00:00
errChan := make ( chan error , 0 )
2024-02-02 00:02:24 +00:00
2024-02-02 00:32:06 +00:00
partNumber := int ( i + 1 )
2024-02-25 12:47:43 +00:00
job := r . cron . RetryableJob ( cron . RetryableJobParams {
2024-02-01 07:03:04 +00:00
Name : fileName + "-part-" + strconv . FormatUint ( i , 10 ) ,
Function : func ( ) error {
2024-02-02 00:02:24 +00:00
reader , err := rf ( uint ( start ) , uint ( end ) )
defer func ( reader io . ReadCloser ) {
err := reader . Close ( )
if err != nil {
r . logger . Error ( "failed to close reader" , zap . Error ( err ) )
}
} ( reader )
if err != nil {
return err
}
2024-02-17 11:37:45 +00:00
ret , err := r . workerClient . UploadMultipartUploadPart ( context . Background ( ) , reader , bucket , fileName , uploadId , partNumber , api . UploadMultipartUploadPartOptions { } )
2024-02-01 07:03:04 +00:00
if err != nil {
return err
}
2024-02-02 02:14:44 +00:00
nextChan <- ret . ETag
2024-02-01 07:03:04 +00:00
return nil
} ,
Limit : 10 ,
2024-02-02 00:26:23 +00:00
Error : func ( jobID uuid . UUID , jobName string , err error ) {
if errors . Is ( err , cron . ErrRetryLimitReached ) {
r . logger . Error ( "failed to upload part" , zap . String ( "jobName" , jobName ) , zap . Error ( err ) )
errChan <- err
}
} ,
2024-02-01 07:03:04 +00:00
} )
_ , err = r . cron . CreateJob ( job )
if err != nil {
r . logger . Error ( "failed to create job" , zap . Error ( err ) )
return err
}
uploadParts [ i ] = api . MultipartCompletedPart {
2024-02-02 00:35:05 +00:00
PartNumber : partNumber ,
2024-02-01 07:03:04 +00:00
}
2024-02-02 00:26:23 +00:00
select {
case err = <- errChan :
return fmt . Errorf ( "failed to upload part %d: %s" , i , err . Error ( ) )
2024-02-02 02:14:44 +00:00
case etag := <- nextChan :
uploadParts [ i ] . ETag = etag
2024-02-17 11:37:45 +00:00
case <- ctx . Done ( ) :
return ctx . Err ( )
2024-02-02 00:26:23 +00:00
}
2024-02-01 07:03:04 +00:00
}
2024-02-17 11:37:45 +00:00
_ , err = r . busClient . CompleteMultipartUpload ( ctx , bucket , fileName , uploadId , uploadParts )
2024-02-01 07:03:04 +00:00
if err != nil {
return err
}
2024-03-02 01:42:42 +00:00
if tx := r . db . WithContext ( ctx ) . Delete ( & siaUpload ) ; tx . Error != nil {
return tx . Error
}
2024-02-01 07:03:04 +00:00
return nil
}
2024-02-17 02:59:02 +00:00
func ( r * RenterDefault ) DeleteObject ( ctx context . Context , bucket string , fileName string ) error {
return r . workerClient . DeleteObject ( ctx , bucket , fileName , api . DeleteObjectOptions { } )
}