From e034e1d54ed4f31290c761a4c393ff65912fa93a Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Mon, 29 Jan 2024 15:11:57 -0500 Subject: [PATCH] refactor: restructure s5 protocol/api to use new fx module and new library structure. Also move the proto/api to its own package for organization --- api/api.go | 5 +- api/s5/http.go | 48 +++++---- api/{ => s5}/s5.go | 23 +++-- protocols/protocols.go | 5 +- protocols/registry/registry.go | 2 +- protocols/{ => s5}/s5.go | 174 ++++++++++++++++++--------------- 6 files changed, 145 insertions(+), 112 deletions(-) rename api/{ => s5}/s5.go (93%) rename protocols/{ => s5}/s5.go (66%) diff --git a/api/api.go b/api/api.go index cf3ee59..d62e635 100644 --- a/api/api.go +++ b/api/api.go @@ -3,6 +3,7 @@ package api import ( "context" "git.lumeweb.com/LumeWeb/portal/api/registry" + "git.lumeweb.com/LumeWeb/portal/api/s5" "github.com/samber/lo" "github.com/spf13/viper" "go.uber.org/fx" @@ -11,8 +12,8 @@ import ( func RegisterApis() { registry.Register(registry.APIEntry{ Key: "s5", - Module: S5Module, - InitFunc: InitS5Api, + Module: s5.Module, + InitFunc: s5.InitAPI, }) } diff --git a/api/s5/http.go b/api/s5/http.go index 9d51028..b561064 100644 --- a/api/s5/http.go +++ b/api/s5/http.go @@ -10,15 +10,16 @@ import ( "errors" "fmt" "git.lumeweb.com/LumeWeb/libs5-go/encoding" - s5interfaces "git.lumeweb.com/LumeWeb/libs5-go/interfaces" "git.lumeweb.com/LumeWeb/libs5-go/metadata" - s5protocol "git.lumeweb.com/LumeWeb/libs5-go/protocol" - s5storage "git.lumeweb.com/LumeWeb/libs5-go/storage" + libs5node "git.lumeweb.com/LumeWeb/libs5-go/node" + libs5protocol "git.lumeweb.com/LumeWeb/libs5-go/protocol" + libs5service "git.lumeweb.com/LumeWeb/libs5-go/service" + libs5storage "git.lumeweb.com/LumeWeb/libs5-go/storage" "git.lumeweb.com/LumeWeb/libs5-go/types" "git.lumeweb.com/LumeWeb/portal/account" "git.lumeweb.com/LumeWeb/portal/api/middleware" "git.lumeweb.com/LumeWeb/portal/db/models" - "git.lumeweb.com/LumeWeb/portal/protocols" + "git.lumeweb.com/LumeWeb/portal/protocols/s5" "git.lumeweb.com/LumeWeb/portal/storage" emailverifier "github.com/AfterShip/email-verifier" "github.com/samber/lo" @@ -83,7 +84,7 @@ type HttpHandler struct { storage *storage.StorageServiceImpl db *gorm.DB accounts *account.AccountServiceImpl - protocol *protocols.S5Protocol + protocol *s5.S5Protocol } type HttpHandlerParams struct { @@ -94,7 +95,7 @@ type HttpHandlerParams struct { Storage *storage.StorageServiceImpl Db *gorm.DB Accounts *account.AccountServiceImpl - Protocol *protocols.S5Protocol + Protocol *s5.S5Protocol } type HttpHandlerResult struct { @@ -955,7 +956,7 @@ func (h *HttpHandler) DebugDownloadUrls(jc jape.Context) { node := h.getNode() - dlUriProvider := s5storage.NewStorageLocationProvider(node, &decodedCid.Hash, types.StorageLocationTypeFull, types.StorageLocationTypeFile, types.StorageLocationTypeBridge) + dlUriProvider := h.newStorageLocationProvider(&decodedCid.Hash, types.StorageLocationTypeFull, types.StorageLocationTypeFile, types.StorageLocationTypeBridge) err = dlUriProvider.Start() if err != nil { @@ -971,7 +972,7 @@ func (h *HttpHandler) DebugDownloadUrls(jc jape.Context) { return } - locations, err := node.GetCachedStorageLocations(&decodedCid.Hash, []types.StorageLocationType{ + locations, err := node.Services().Storage().GetCachedStorageLocations(&decodedCid.Hash, []types.StorageLocationType{ types.StorageLocationTypeFull, types.StorageLocationTypeFile, types.StorageLocationTypeBridge, }) if err != nil { @@ -980,7 +981,7 @@ func (h *HttpHandler) DebugDownloadUrls(jc jape.Context) { return } - availableNodes := lo.Keys[string, s5interfaces.StorageLocation](locations) + availableNodes := lo.Keys[string, libs5storage.StorageLocation](locations) availableNodesIds := make([]*encoding.NodeId, len(availableNodes)) @@ -1072,7 +1073,7 @@ func (h *HttpHandler) RegistrySet(jc jape.Context) { return } - entry := s5protocol.NewSignedRegistryEntry(pk, request.Revision, data, signature) + entry := libs5protocol.NewSignedRegistryEntry(pk, request.Revision, data, signature) err = h.getNode().Services().Registry().Set(entry, false, nil) if jc.Check("error setting registry entry", err) != nil { @@ -1140,7 +1141,7 @@ func (h *HttpHandler) RegistrySubscription(jc jape.Context) { break } - off, err := h.getNode().Services().Registry().Listen(sre, func(entry s5interfaces.SignedRegistryEntry) { + off, err := h.getNode().Services().Registry().Listen(sre, func(entry libs5protocol.SignedRegistryEntry) { encoded, err := msgpack.Marshal(entry) if err != nil { h.logger.Error("error encoding entry", zap.Error(err)) @@ -1162,7 +1163,7 @@ func (h *HttpHandler) RegistrySubscription(jc jape.Context) { } } -func (h *HttpHandler) getNode() s5interfaces.Node { +func (h *HttpHandler) getNode() *libs5node.Node { return h.protocol.Node() } @@ -1180,7 +1181,7 @@ func (h *HttpHandler) DownloadBlob(jc jape.Context) { return } - dlUriProvider := s5storage.NewStorageLocationProvider(h.getNode(), &cidDecoded.Hash, types.StorageLocationTypeFull, types.StorageLocationTypeFile, types.StorageLocationTypeBridge) + dlUriProvider := h.newStorageLocationProvider(&cidDecoded.Hash, types.StorageLocationTypeFull, types.StorageLocationTypeFile, types.StorageLocationTypeBridge) err = dlUriProvider.Start() @@ -1234,7 +1235,7 @@ func (h *HttpHandler) DebugStorageLocations(jc jape.Context) { } } - dlUriProvider := s5storage.NewStorageLocationProvider(h.getNode(), decodedHash, types.StorageLocationTypeFull, types.StorageLocationTypeFile, types.StorageLocationTypeBridge) + dlUriProvider := h.newStorageLocationProvider(decodedHash, typeIntList...) err = dlUriProvider.Start() if jc.Check("error starting search", err) != nil { @@ -1246,12 +1247,12 @@ func (h *HttpHandler) DebugStorageLocations(jc jape.Context) { return } - locations, err := h.getNode().GetCachedStorageLocations(decodedHash, typeIntList) + locations, err := h.getNode().Services().Storage().GetCachedStorageLocations(decodedHash, typeIntList) if jc.Check("error getting cached locations", err) != nil { return } - availableNodes := lo.Keys[string, s5interfaces.StorageLocation](locations) + availableNodes := lo.Keys[string, libs5storage.StorageLocation](locations) availableNodesIds := make([]*encoding.NodeId, len(availableNodes)) for i, nodeIdStr := range availableNodes { @@ -1319,7 +1320,7 @@ func (h *HttpHandler) DownloadMetadata(jc jape.Context) { return } - meta, err := h.getNode().GetMetadataByCID(cidDecoded) + meta, err := h.getNode().Services().Storage().GetMetadataByCID(cidDecoded) if jc.Check("error getting metadata", err) != nil { h.logger.Error("error getting metadata", zap.Error(err)) @@ -1379,6 +1380,19 @@ func (h *HttpHandler) DownloadFile(jc jape.Context) { http.ServeContent(jc.ResponseWriter, jc.Request, file.Name(), file.Modtime(), file) } +func (h *HttpHandler) newStorageLocationProvider(hash *encoding.Multihash, types ...types.StorageLocationType) libs5storage.StorageLocationProvider { + return libs5storage.NewStorageLocationProvider(libs5storage.StorageLocationProviderParams{ + Services: h.getNode().Services(), + Hash: hash, + LocationTypes: types, + ServiceParams: libs5service.ServiceParams{ + Logger: h.logger, + Config: h.getNode().Config(), + Db: h.getNode().Db(), + }, + }) +} + func setAuthCookie(jwt string, jc jape.Context) { authCookie := http.Cookie{ Name: "s5-auth-token", diff --git a/api/s5.go b/api/s5/s5.go similarity index 93% rename from api/s5.go rename to api/s5/s5.go index 22697e6..4917a59 100644 --- a/api/s5.go +++ b/api/s5/s5.go @@ -1,4 +1,4 @@ -package api +package s5 import ( "context" @@ -7,9 +7,8 @@ import ( "git.lumeweb.com/LumeWeb/portal/account" "git.lumeweb.com/LumeWeb/portal/api/middleware" "git.lumeweb.com/LumeWeb/portal/api/registry" - "git.lumeweb.com/LumeWeb/portal/api/s5" - "git.lumeweb.com/LumeWeb/portal/protocols" protoRegistry "git.lumeweb.com/LumeWeb/portal/protocols/registry" + "git.lumeweb.com/LumeWeb/portal/protocols/s5" "git.lumeweb.com/LumeWeb/portal/storage" "github.com/rs/cors" "github.com/spf13/viper" @@ -29,18 +28,18 @@ type S5API struct { accounts *account.AccountServiceImpl storage *storage.StorageServiceImpl protocols []protoRegistry.Protocol - httpHandler s5.HttpHandler - protocol *protocols.S5Protocol + httpHandler HttpHandler + protocol *s5.S5Protocol } -type S5ApiParams struct { +type APIParams struct { fx.In Config *viper.Viper Identity ed25519.PrivateKey Accounts *account.AccountServiceImpl Storage *storage.StorageServiceImpl Protocols []protoRegistry.Protocol `group:"protocol"` - HttpHandler s5.HttpHandler + HttpHandler HttpHandler } type S5ApiResult struct { @@ -49,7 +48,7 @@ type S5ApiResult struct { S5API *S5API } -func NewS5(params S5ApiParams) (S5ApiResult, error) { +func NewS5(params APIParams) (S5ApiResult, error) { api := &S5API{ config: params.Config, identity: params.Identity, @@ -64,13 +63,13 @@ func NewS5(params S5ApiParams) (S5ApiResult, error) { }, nil } -func InitS5Api(api *S5API) error { +func InitAPI(api *S5API) error { return api.Init() } -var S5Module = fx.Module("s5_api", +var Module = fx.Module("s5_api", fx.Provide(NewS5), - fx.Provide(s5.NewHttpHandler), + fx.Provide(NewHttpHandler), ) func (s *S5API) Init() error { @@ -79,7 +78,7 @@ func (s *S5API) Init() error { return fmt.Errorf("s5 protocol not found") } - s5protocolInstance := s5protocol.(*protocols.S5Protocol) + s5protocolInstance := s5protocol.(*s5.S5Protocol) s.protocol = s5protocolInstance router := s5protocolInstance.Node().Services().HTTP().GetHttpRouter(getRoutes(s)) middleware.RegisterProtocolSubdomain(s.config, router, "s5") diff --git a/protocols/protocols.go b/protocols/protocols.go index 2f49703..2bfba0b 100644 --- a/protocols/protocols.go +++ b/protocols/protocols.go @@ -3,6 +3,7 @@ package protocols import ( "context" "git.lumeweb.com/LumeWeb/portal/protocols/registry" + "git.lumeweb.com/LumeWeb/portal/protocols/s5" "github.com/samber/lo" "github.com/spf13/viper" "go.uber.org/fx" @@ -11,8 +12,8 @@ import ( func RegisterProtocols() { registry.Register(registry.ProtocolEntry{ Key: "s5", - Module: S5ProtocolModule, - InitFunc: InitS5Protocol, + Module: s5.ProtocolModule, + InitFunc: s5.InitProtocol, }) } diff --git a/protocols/registry/registry.go b/protocols/registry/registry.go index 8a60568..823a171 100644 --- a/protocols/registry/registry.go +++ b/protocols/registry/registry.go @@ -9,7 +9,7 @@ const GroupName = "protocols" type Protocol interface { Name() string - Init() error + Init(...any) error Start(ctx context.Context) error Stop(ctx context.Context) error } diff --git a/protocols/s5.go b/protocols/s5/s5.go similarity index 66% rename from protocols/s5.go rename to protocols/s5/s5.go index 177b02c..f8822fa 100644 --- a/protocols/s5.go +++ b/protocols/s5/s5.go @@ -1,4 +1,4 @@ -package protocols +package s5 import ( "context" @@ -7,7 +7,7 @@ import ( s5config "git.lumeweb.com/LumeWeb/libs5-go/config" s5ed "git.lumeweb.com/LumeWeb/libs5-go/ed25519" "git.lumeweb.com/LumeWeb/libs5-go/encoding" - s5interfaces "git.lumeweb.com/LumeWeb/libs5-go/interfaces" + s5fx "git.lumeweb.com/LumeWeb/libs5-go/fx" s5node "git.lumeweb.com/LumeWeb/libs5-go/node" s5storage "git.lumeweb.com/LumeWeb/libs5-go/storage" "git.lumeweb.com/LumeWeb/libs5-go/types" @@ -21,17 +21,16 @@ import ( ) var ( - _ s5interfaces.ProviderStore = (*S5ProviderStore)(nil) - _ registry.Protocol = (*S5Protocol)(nil) + _ s5storage.ProviderStore = (*S5ProviderStore)(nil) + _ registry.Protocol = (*S5Protocol)(nil) ) type S5Protocol struct { - node s5interfaces.Node - config *viper.Viper - logger *zap.Logger - storage *storage.StorageServiceImpl - identity ed25519.PrivateKey - providerStore *S5ProviderStore + config *viper.Viper + logger *zap.Logger + storage *storage.StorageServiceImpl + identity ed25519.PrivateKey + node *s5node.Node } type S5ProtocolParams struct { @@ -45,32 +44,93 @@ type S5ProtocolParams struct { type S5ProtocolResult struct { fx.Out - Protocol registry.Protocol `group:"protocol"` - S5Protocol *S5Protocol + Protocol registry.Protocol `group:"protocol"` + S5Protocol *S5Protocol + S5NodeConfig *s5config.NodeConfig } -var S5ProtocolModule = fx.Module("s5_api", +var ProtocolModule = fx.Module("s5_api", fx.Provide(NewS5Protocol), fx.Provide(NewS5ProviderStore), + s5fx.Module, ) func NewS5Protocol( params S5ProtocolParams, ) (S5ProtocolResult, error) { proto := &S5Protocol{ - config: params.Config, - logger: params.Logger, - storage: params.Storage, - identity: params.Identity, - providerStore: params.ProviderStore, + config: params.Config, + logger: params.Logger, + storage: params.Storage, + identity: params.Identity, + } + + cfg, err := ConfigureS5Protocol(params) + if err != nil { + return S5ProtocolResult{}, err } return S5ProtocolResult{ - Protocol: proto, - S5Protocol: proto, + Protocol: proto, + S5Protocol: proto, + S5NodeConfig: cfg, }, nil } +func ConfigureS5Protocol(params S5ProtocolParams) (*s5config.NodeConfig, error) { + cfg := &s5config.NodeConfig{ + P2P: s5config.P2PConfig{ + Network: "", + Peers: s5config.PeersConfig{Initial: []string{}}, + }, + KeyPair: s5ed.New(params.Identity), + DB: nil, + Logger: params.Logger.Named("s5"), + HTTP: s5config.HTTPConfig{}, + } + + pconfig := params.Config.Sub("protocol.s5") + + if pconfig == nil { + params.Logger.Fatal("Missing protocol.s5 Config") + } + + err := pconfig.Unmarshal(cfg) + if err != nil { + return nil, err + } + + cfg.HTTP.API.Domain = fmt.Sprintf("s5.%s", params.Config.GetString("core.domain")) + + if params.Config.IsSet("core.externalPort") { + cfg.HTTP.API.Port = params.Config.GetUint("core.externalPort") + } else { + cfg.HTTP.API.Port = params.Config.GetUint("core.port") + } + + dbPath := pconfig.GetString("dbPath") + + if dbPath == "" { + params.Logger.Fatal("protocol.s5.dbPath is required") + } + + _, p, err := ed25519.GenerateKey(nil) + if err != nil { + params.Logger.Fatal("Failed to generate key", zap.Error(err)) + } + + cfg.KeyPair = s5ed.New(p) + + db, err := bolt.Open(dbPath, 0600, nil) + if err != nil { + params.Logger.Fatal("Failed to open db", zap.Error(err)) + } + + cfg.DB = db + + return cfg, nil +} + func NewS5ProviderStore(config *viper.Viper, logger *zap.Logger, storage *storage.StorageServiceImpl) *S5ProviderStore { return &S5ProviderStore{ config: config, @@ -79,64 +139,22 @@ func NewS5ProviderStore(config *viper.Viper, logger *zap.Logger, storage *storag } } -func InitS5Protocol(s5 *S5Protocol) error { - return s5.Init() +func InitProtocol(s5 *S5Protocol, node *s5node.Node, store *S5ProviderStore) error { + return s5.Init(node, store) } -func (s *S5Protocol) Init() error { - cfg := &s5config.NodeConfig{ - P2P: s5config.P2PConfig{ - Network: "", - Peers: s5config.PeersConfig{Initial: []string{}}, - }, - KeyPair: s5ed.New(s.identity), - DB: nil, - Logger: s.logger.Named("s5"), - HTTP: s5config.HTTPConfig{}, - } - - pconfig := s.config.Sub("protocol.s5") - - if pconfig == nil { - s.logger.Fatal("Missing protocol.s5 Config") - } - - err := pconfig.Unmarshal(cfg) - if err != nil { - return err - } - - cfg.HTTP.API.Domain = fmt.Sprintf("s5.%s", s.config.GetString("core.domain")) - - if s.config.IsSet("core.externalPort") { - cfg.HTTP.API.Port = s.config.GetUint("core.externalPort") +func (s *S5Protocol) Init(args ...any) error { + if node, ok := args[0].(*s5node.Node); !ok { + s.logger.Fatal("Node is not a s5 node") } else { - cfg.HTTP.API.Port = s.config.GetUint("core.port") + s.node = node } - dbPath := pconfig.GetString("dbPath") - - if dbPath == "" { - s.logger.Fatal("protocol.s5.dbPath is required") + if store, ok := args[1].(*S5ProviderStore); !ok { + s.logger.Fatal("Store is not a s5 store") + } else { + s.node.Services().Storage().SetProviderStore(store) } - _, p, err := ed25519.GenerateKey(nil) - if err != nil { - s.logger.Fatal("Failed to generate key", zap.Error(err)) - } - - cfg.KeyPair = s5ed.New(p) - - db, err := bolt.Open(dbPath, 0600, nil) - if err != nil { - s.logger.Fatal("Failed to open db", zap.Error(err)) - } - - cfg.DB = db - - s.node = s5node.NewNode(cfg) - - s.node.SetProviderStore(s.providerStore) - return nil } func (s *S5Protocol) Start(ctx context.Context) error { @@ -145,7 +163,7 @@ func (s *S5Protocol) Start(ctx context.Context) error { return err } - identity, err := s.node.Services().P2P().NodeId().ToString() + identity, err := s.node.NodeId().ToString() if err != nil { return err @@ -160,12 +178,12 @@ func (s *S5Protocol) Name() string { return "s5" } -func (s *S5Protocol) Stop(ctx context.Context) error { - return nil +func (s *S5Protocol) Node() *s5node.Node { + return s.node } -func (s *S5Protocol) Node() s5interfaces.Node { - return s.node +func (s *S5Protocol) Stop(ctx context.Context) error { + return nil } type S5ProviderStore struct { @@ -194,7 +212,7 @@ func (s S5ProviderStore) CanProvide(hash *encoding.Multihash, kind []types.Stora return false } -func (s S5ProviderStore) Provide(hash *encoding.Multihash, kind []types.StorageLocationType) (s5interfaces.StorageLocation, error) { +func (s S5ProviderStore) Provide(hash *encoding.Multihash, kind []types.StorageLocationType) (s5storage.StorageLocation, error) { for _, t := range kind { if !s.CanProvide(hash, []types.StorageLocationType{t}) { continue