From aff6e8106cf4f4dacb783c390fb8bb686f1de07f Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Wed, 31 Jan 2024 21:27:38 -0500 Subject: [PATCH] refactor: split renter methods to a dedicated renter service --- cmd/portal/main.go | 2 + renter/renter.go | 104 +++++++++++++++++++++++++++++++++++++++++++++ storage/storage.go | 79 +++++++--------------------------- 3 files changed, 121 insertions(+), 64 deletions(-) create mode 100644 renter/renter.go diff --git a/cmd/portal/main.go b/cmd/portal/main.go index 7628506..83e2086 100644 --- a/cmd/portal/main.go +++ b/cmd/portal/main.go @@ -8,6 +8,7 @@ import ( "git.lumeweb.com/LumeWeb/portal/db" _logger "git.lumeweb.com/LumeWeb/portal/logger" "git.lumeweb.com/LumeWeb/portal/protocols" + "git.lumeweb.com/LumeWeb/portal/renter" "git.lumeweb.com/LumeWeb/portal/storage" flag "github.com/spf13/pflag" "github.com/spf13/viper" @@ -63,6 +64,7 @@ func main() { fx.Invoke(initCheckRequiredConfig), fx.Provide(NewIdentity), db.Module, + renter.Module, storage.Module, cron.Module, account.Module, diff --git a/renter/renter.go b/renter/renter.go new file mode 100644 index 0000000..ac416d7 --- /dev/null +++ b/renter/renter.go @@ -0,0 +1,104 @@ +package renter + +import ( + "context" + "errors" + "github.com/spf13/viper" + "go.sia.tech/renterd/api" + busClient "go.sia.tech/renterd/bus/client" + workerClient "go.sia.tech/renterd/worker/client" + "go.uber.org/fx" + "go.uber.org/zap" + "io" + "net/url" +) + +type RenterServiceParams struct { + fx.In + Config *viper.Viper + Logger *zap.Logger +} + +type RenterDefault struct { + busClient *busClient.Client + workerClient *workerClient.Client + config *viper.Viper + logger *zap.Logger +} + +var Module = fx.Module("renter", + fx.Options( + fx.Provide(NewRenterService), + fx.Invoke(func(r *RenterDefault) error { + return r.init() + }), + ), +) + +func NewRenterService(params RenterServiceParams) *RenterDefault { + return &RenterDefault{ + config: params.Config, + logger: params.Logger, + } +} + +func (r *RenterDefault) CreateBucketIfNotExists(bucket string) error { + + _, err := r.busClient.Bucket(context.Background(), bucket) + + if err == nil { + return nil + } + + if err != nil { + if !errors.Is(err, api.ErrBucketNotFound) { + return err + } + } + + err = r.busClient.CreateBucket(context.Background(), bucket, api.CreateBucketOptions{ + Policy: api.BucketPolicy{ + PublicReadAccess: false, + }, + }) + if err != nil { + return err + } + + return nil +} + +func (r *RenterDefault) UploadObject(ctx context.Context, file io.Reader, bucket string, hash string) error { + _, err := r.workerClient.UploadObject(ctx, file, bucket, hash, api.UploadObjectOptions{}) + + if err != nil { + return err + } + + return nil +} + +func (r *RenterDefault) init() error { + addr := r.config.GetString("core.sia.url") + passwd := r.config.GetString("core.sia.key") + + addrURL, err := url.Parse(addr) + + if err != nil { + return err + } + + addrURL.Path = "/api/worker" + + r.workerClient = workerClient.New(addrURL.String(), passwd) + + addrURL.Path = "/api/bus" + + r.busClient = busClient.New(addrURL.String(), passwd) + + return nil +} + +func (r *RenterDefault) GetObject(ctx context.Context, protocol string, hash string, options api.DownloadObjectOptions) (*api.GetObjectResponse, error) { + return r.workerClient.GetObject(ctx, protocol, hash, options) +} diff --git a/storage/storage.go b/storage/storage.go index ebfa331..1dc71d5 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -12,6 +12,7 @@ import ( "git.lumeweb.com/LumeWeb/portal/api/middleware" "git.lumeweb.com/LumeWeb/portal/cron" "git.lumeweb.com/LumeWeb/portal/db/models" + "git.lumeweb.com/LumeWeb/portal/renter" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" @@ -22,15 +23,12 @@ import ( tusd "github.com/tus/tusd/v2/pkg/handler" "github.com/tus/tusd/v2/pkg/s3store" "go.sia.tech/renterd/api" - busClient "go.sia.tech/renterd/bus/client" - workerClient "go.sia.tech/renterd/worker/client" "go.uber.org/fx" "go.uber.org/zap" "gorm.io/gorm" "io" "lukechampine.com/blake3" "net/http" - "net/url" "strings" "time" ) @@ -54,16 +52,15 @@ var Module = fx.Module("storage", ) type StorageServiceDefault struct { - busClient *busClient.Client - workerClient *workerClient.Client - tus *tusd.Handler - tusStore tusd.DataStore - s3Client *s3.Client - config *viper.Viper - logger *zap.Logger - db *gorm.DB - accounts *account.AccountServiceDefault - cron *cron.CronServiceDefault + tus *tusd.Handler + tusStore tusd.DataStore + s3Client *s3.Client + config *viper.Viper + logger *zap.Logger + db *gorm.DB + accounts *account.AccountServiceDefault + cron *cron.CronServiceDefault + renter *renter.RenterDefault } func (s *StorageServiceDefault) Tus() *tusd.Handler { @@ -96,12 +93,12 @@ func (s StorageServiceDefault) PutFileSmall(file io.ReadSeeker, bucket string, g return nil, err } - err = s.createBucketIfNotExists(bucket) + err = s.renter.CreateBucketIfNotExists(bucket) if err != nil { return nil, err } - _, err = s.workerClient.UploadObject(context.Background(), file, bucket, hashStr, api.UploadObjectOptions{}) + err = s.renter.UploadObject(context.Background(), file, bucket, hashStr) if err != nil { return nil, err @@ -111,12 +108,12 @@ func (s StorageServiceDefault) PutFileSmall(file io.ReadSeeker, bucket string, g } func (s StorageServiceDefault) PutFile(file io.Reader, bucket string, hash []byte) error { hashStr, err := encoding.NewMultihash(s.getPrefixedHash(hash)).ToBase64Url() - err = s.createBucketIfNotExists(bucket) + err = s.renter.CreateBucketIfNotExists(bucket) if err != nil { return err } - _, err = s.workerClient.UploadObject(context.Background(), file, bucket, hashStr, api.UploadObjectOptions{}) + err = s.renter.UploadObject(context.Background(), file, bucket, hashStr) if err != nil { return err } @@ -173,24 +170,6 @@ func (s *StorageServiceDefault) BuildUploadBufferTus(basePath string, preUploadC } func (s *StorageServiceDefault) init() error { - - addr := s.config.GetString("core.sia.url") - passwd := s.config.GetString("core.sia.key") - - addrURL, err := url.Parse(addr) - - if err != nil { - return err - } - - addrURL.Path = "/api/worker" - - s.workerClient = workerClient.New(addrURL.String(), passwd) - - addrURL.Path = "/api/bus" - - s.busClient = busClient.New(addrURL.String(), passwd) - preUpload := func(hook tusd.HookEvent) (tusd.HTTPResponse, tusd.FileInfoChanges, error) { blankResp := tusd.HTTPResponse{} blankChanges := tusd.FileInfoChanges{} @@ -233,40 +212,12 @@ func (s *StorageServiceDefault) init() error { s.cron.RegisterService(s) - go s.tusWorker() - return nil } func (s *StorageServiceDefault) LoadInitialTasks(cron cron.CronService) error { return nil } -func (s *StorageServiceDefault) createBucketIfNotExists(bucket string) error { - - _, err := s.busClient.Bucket(context.Background(), bucket) - - if err == nil { - return nil - } - - if err != nil { - if !errors.Is(err, api.ErrBucketNotFound) { - return err - } - } - - err = s.busClient.CreateBucket(context.Background(), bucket, api.CreateBucketOptions{ - Policy: api.BucketPolicy{ - PublicReadAccess: false, - }, - }) - if err != nil { - return err - } - - return nil -} - func (s *StorageServiceDefault) FileExists(hash []byte) (bool, models.Upload) { hashStr := hex.EncodeToString(hash) @@ -665,7 +616,7 @@ func (s *StorageServiceDefault) GetFile(hash []byte, start int64) (io.ReadCloser } } - object, err := s.workerClient.GetObject(context.Background(), upload.Protocol, hashStr, api.DownloadObjectOptions{ + object, err := s.renter.GetObject(context.Background(), upload.Protocol, hashStr, api.DownloadObjectOptions{ Range: partialRange, })