refactor: split renter methods to a dedicated renter service

This commit is contained in:
Derrick Hammer 2024-01-31 21:27:38 -05:00
parent 6d34f5b683
commit aff6e8106c
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
3 changed files with 121 additions and 64 deletions

View File

@ -8,6 +8,7 @@ import (
"git.lumeweb.com/LumeWeb/portal/db" "git.lumeweb.com/LumeWeb/portal/db"
_logger "git.lumeweb.com/LumeWeb/portal/logger" _logger "git.lumeweb.com/LumeWeb/portal/logger"
"git.lumeweb.com/LumeWeb/portal/protocols" "git.lumeweb.com/LumeWeb/portal/protocols"
"git.lumeweb.com/LumeWeb/portal/renter"
"git.lumeweb.com/LumeWeb/portal/storage" "git.lumeweb.com/LumeWeb/portal/storage"
flag "github.com/spf13/pflag" flag "github.com/spf13/pflag"
"github.com/spf13/viper" "github.com/spf13/viper"
@ -63,6 +64,7 @@ func main() {
fx.Invoke(initCheckRequiredConfig), fx.Invoke(initCheckRequiredConfig),
fx.Provide(NewIdentity), fx.Provide(NewIdentity),
db.Module, db.Module,
renter.Module,
storage.Module, storage.Module,
cron.Module, cron.Module,
account.Module, account.Module,

104
renter/renter.go Normal file
View File

@ -0,0 +1,104 @@
package renter
import (
"context"
"errors"
"github.com/spf13/viper"
"go.sia.tech/renterd/api"
busClient "go.sia.tech/renterd/bus/client"
workerClient "go.sia.tech/renterd/worker/client"
"go.uber.org/fx"
"go.uber.org/zap"
"io"
"net/url"
)
type RenterServiceParams struct {
fx.In
Config *viper.Viper
Logger *zap.Logger
}
type RenterDefault struct {
busClient *busClient.Client
workerClient *workerClient.Client
config *viper.Viper
logger *zap.Logger
}
var Module = fx.Module("renter",
fx.Options(
fx.Provide(NewRenterService),
fx.Invoke(func(r *RenterDefault) error {
return r.init()
}),
),
)
func NewRenterService(params RenterServiceParams) *RenterDefault {
return &RenterDefault{
config: params.Config,
logger: params.Logger,
}
}
func (r *RenterDefault) CreateBucketIfNotExists(bucket string) error {
_, err := r.busClient.Bucket(context.Background(), bucket)
if err == nil {
return nil
}
if err != nil {
if !errors.Is(err, api.ErrBucketNotFound) {
return err
}
}
err = r.busClient.CreateBucket(context.Background(), bucket, api.CreateBucketOptions{
Policy: api.BucketPolicy{
PublicReadAccess: false,
},
})
if err != nil {
return err
}
return nil
}
func (r *RenterDefault) UploadObject(ctx context.Context, file io.Reader, bucket string, hash string) error {
_, err := r.workerClient.UploadObject(ctx, file, bucket, hash, api.UploadObjectOptions{})
if err != nil {
return err
}
return nil
}
func (r *RenterDefault) init() error {
addr := r.config.GetString("core.sia.url")
passwd := r.config.GetString("core.sia.key")
addrURL, err := url.Parse(addr)
if err != nil {
return err
}
addrURL.Path = "/api/worker"
r.workerClient = workerClient.New(addrURL.String(), passwd)
addrURL.Path = "/api/bus"
r.busClient = busClient.New(addrURL.String(), passwd)
return nil
}
func (r *RenterDefault) GetObject(ctx context.Context, protocol string, hash string, options api.DownloadObjectOptions) (*api.GetObjectResponse, error) {
return r.workerClient.GetObject(ctx, protocol, hash, options)
}

View File

@ -12,6 +12,7 @@ import (
"git.lumeweb.com/LumeWeb/portal/api/middleware" "git.lumeweb.com/LumeWeb/portal/api/middleware"
"git.lumeweb.com/LumeWeb/portal/cron" "git.lumeweb.com/LumeWeb/portal/cron"
"git.lumeweb.com/LumeWeb/portal/db/models" "git.lumeweb.com/LumeWeb/portal/db/models"
"git.lumeweb.com/LumeWeb/portal/renter"
"github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/credentials"
@ -22,15 +23,12 @@ import (
tusd "github.com/tus/tusd/v2/pkg/handler" tusd "github.com/tus/tusd/v2/pkg/handler"
"github.com/tus/tusd/v2/pkg/s3store" "github.com/tus/tusd/v2/pkg/s3store"
"go.sia.tech/renterd/api" "go.sia.tech/renterd/api"
busClient "go.sia.tech/renterd/bus/client"
workerClient "go.sia.tech/renterd/worker/client"
"go.uber.org/fx" "go.uber.org/fx"
"go.uber.org/zap" "go.uber.org/zap"
"gorm.io/gorm" "gorm.io/gorm"
"io" "io"
"lukechampine.com/blake3" "lukechampine.com/blake3"
"net/http" "net/http"
"net/url"
"strings" "strings"
"time" "time"
) )
@ -54,16 +52,15 @@ var Module = fx.Module("storage",
) )
type StorageServiceDefault struct { type StorageServiceDefault struct {
busClient *busClient.Client tus *tusd.Handler
workerClient *workerClient.Client tusStore tusd.DataStore
tus *tusd.Handler s3Client *s3.Client
tusStore tusd.DataStore config *viper.Viper
s3Client *s3.Client logger *zap.Logger
config *viper.Viper db *gorm.DB
logger *zap.Logger accounts *account.AccountServiceDefault
db *gorm.DB cron *cron.CronServiceDefault
accounts *account.AccountServiceDefault renter *renter.RenterDefault
cron *cron.CronServiceDefault
} }
func (s *StorageServiceDefault) Tus() *tusd.Handler { func (s *StorageServiceDefault) Tus() *tusd.Handler {
@ -96,12 +93,12 @@ func (s StorageServiceDefault) PutFileSmall(file io.ReadSeeker, bucket string, g
return nil, err return nil, err
} }
err = s.createBucketIfNotExists(bucket) err = s.renter.CreateBucketIfNotExists(bucket)
if err != nil { if err != nil {
return nil, err return nil, err
} }
_, err = s.workerClient.UploadObject(context.Background(), file, bucket, hashStr, api.UploadObjectOptions{}) err = s.renter.UploadObject(context.Background(), file, bucket, hashStr)
if err != nil { if err != nil {
return nil, err return nil, err
@ -111,12 +108,12 @@ func (s StorageServiceDefault) PutFileSmall(file io.ReadSeeker, bucket string, g
} }
func (s StorageServiceDefault) PutFile(file io.Reader, bucket string, hash []byte) error { func (s StorageServiceDefault) PutFile(file io.Reader, bucket string, hash []byte) error {
hashStr, err := encoding.NewMultihash(s.getPrefixedHash(hash)).ToBase64Url() hashStr, err := encoding.NewMultihash(s.getPrefixedHash(hash)).ToBase64Url()
err = s.createBucketIfNotExists(bucket) err = s.renter.CreateBucketIfNotExists(bucket)
if err != nil { if err != nil {
return err return err
} }
_, err = s.workerClient.UploadObject(context.Background(), file, bucket, hashStr, api.UploadObjectOptions{}) err = s.renter.UploadObject(context.Background(), file, bucket, hashStr)
if err != nil { if err != nil {
return err return err
} }
@ -173,24 +170,6 @@ func (s *StorageServiceDefault) BuildUploadBufferTus(basePath string, preUploadC
} }
func (s *StorageServiceDefault) init() error { func (s *StorageServiceDefault) init() error {
addr := s.config.GetString("core.sia.url")
passwd := s.config.GetString("core.sia.key")
addrURL, err := url.Parse(addr)
if err != nil {
return err
}
addrURL.Path = "/api/worker"
s.workerClient = workerClient.New(addrURL.String(), passwd)
addrURL.Path = "/api/bus"
s.busClient = busClient.New(addrURL.String(), passwd)
preUpload := func(hook tusd.HookEvent) (tusd.HTTPResponse, tusd.FileInfoChanges, error) { preUpload := func(hook tusd.HookEvent) (tusd.HTTPResponse, tusd.FileInfoChanges, error) {
blankResp := tusd.HTTPResponse{} blankResp := tusd.HTTPResponse{}
blankChanges := tusd.FileInfoChanges{} blankChanges := tusd.FileInfoChanges{}
@ -233,40 +212,12 @@ func (s *StorageServiceDefault) init() error {
s.cron.RegisterService(s) s.cron.RegisterService(s)
go s.tusWorker()
return nil return nil
} }
func (s *StorageServiceDefault) LoadInitialTasks(cron cron.CronService) error { func (s *StorageServiceDefault) LoadInitialTasks(cron cron.CronService) error {
return nil return nil
} }
func (s *StorageServiceDefault) createBucketIfNotExists(bucket string) error {
_, err := s.busClient.Bucket(context.Background(), bucket)
if err == nil {
return nil
}
if err != nil {
if !errors.Is(err, api.ErrBucketNotFound) {
return err
}
}
err = s.busClient.CreateBucket(context.Background(), bucket, api.CreateBucketOptions{
Policy: api.BucketPolicy{
PublicReadAccess: false,
},
})
if err != nil {
return err
}
return nil
}
func (s *StorageServiceDefault) FileExists(hash []byte) (bool, models.Upload) { func (s *StorageServiceDefault) FileExists(hash []byte) (bool, models.Upload) {
hashStr := hex.EncodeToString(hash) hashStr := hex.EncodeToString(hash)
@ -665,7 +616,7 @@ func (s *StorageServiceDefault) GetFile(hash []byte, start int64) (io.ReadCloser
} }
} }
object, err := s.workerClient.GetObject(context.Background(), upload.Protocol, hashStr, api.DownloadObjectOptions{ object, err := s.renter.GetObject(context.Background(), upload.Protocol, hashStr, api.DownloadObjectOptions{
Range: partialRange, Range: partialRange,
}) })