refactor: restructure s5 protocol/api to use new fx module and new library structure. Also move the proto/api to its own package for organization

This commit is contained in:
Derrick Hammer 2024-01-29 15:11:57 -05:00
parent 9b891f6ec7
commit e034e1d54e
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
6 changed files with 145 additions and 112 deletions

View File

@ -3,6 +3,7 @@ package api
import (
"context"
"git.lumeweb.com/LumeWeb/portal/api/registry"
"git.lumeweb.com/LumeWeb/portal/api/s5"
"github.com/samber/lo"
"github.com/spf13/viper"
"go.uber.org/fx"
@ -11,8 +12,8 @@ import (
func RegisterApis() {
registry.Register(registry.APIEntry{
Key: "s5",
Module: S5Module,
InitFunc: InitS5Api,
Module: s5.Module,
InitFunc: s5.InitAPI,
})
}

View File

@ -10,15 +10,16 @@ import (
"errors"
"fmt"
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
s5interfaces "git.lumeweb.com/LumeWeb/libs5-go/interfaces"
"git.lumeweb.com/LumeWeb/libs5-go/metadata"
s5protocol "git.lumeweb.com/LumeWeb/libs5-go/protocol"
s5storage "git.lumeweb.com/LumeWeb/libs5-go/storage"
libs5node "git.lumeweb.com/LumeWeb/libs5-go/node"
libs5protocol "git.lumeweb.com/LumeWeb/libs5-go/protocol"
libs5service "git.lumeweb.com/LumeWeb/libs5-go/service"
libs5storage "git.lumeweb.com/LumeWeb/libs5-go/storage"
"git.lumeweb.com/LumeWeb/libs5-go/types"
"git.lumeweb.com/LumeWeb/portal/account"
"git.lumeweb.com/LumeWeb/portal/api/middleware"
"git.lumeweb.com/LumeWeb/portal/db/models"
"git.lumeweb.com/LumeWeb/portal/protocols"
"git.lumeweb.com/LumeWeb/portal/protocols/s5"
"git.lumeweb.com/LumeWeb/portal/storage"
emailverifier "github.com/AfterShip/email-verifier"
"github.com/samber/lo"
@ -83,7 +84,7 @@ type HttpHandler struct {
storage *storage.StorageServiceImpl
db *gorm.DB
accounts *account.AccountServiceImpl
protocol *protocols.S5Protocol
protocol *s5.S5Protocol
}
type HttpHandlerParams struct {
@ -94,7 +95,7 @@ type HttpHandlerParams struct {
Storage *storage.StorageServiceImpl
Db *gorm.DB
Accounts *account.AccountServiceImpl
Protocol *protocols.S5Protocol
Protocol *s5.S5Protocol
}
type HttpHandlerResult struct {
@ -955,7 +956,7 @@ func (h *HttpHandler) DebugDownloadUrls(jc jape.Context) {
node := h.getNode()
dlUriProvider := s5storage.NewStorageLocationProvider(node, &decodedCid.Hash, types.StorageLocationTypeFull, types.StorageLocationTypeFile, types.StorageLocationTypeBridge)
dlUriProvider := h.newStorageLocationProvider(&decodedCid.Hash, types.StorageLocationTypeFull, types.StorageLocationTypeFile, types.StorageLocationTypeBridge)
err = dlUriProvider.Start()
if err != nil {
@ -971,7 +972,7 @@ func (h *HttpHandler) DebugDownloadUrls(jc jape.Context) {
return
}
locations, err := node.GetCachedStorageLocations(&decodedCid.Hash, []types.StorageLocationType{
locations, err := node.Services().Storage().GetCachedStorageLocations(&decodedCid.Hash, []types.StorageLocationType{
types.StorageLocationTypeFull, types.StorageLocationTypeFile, types.StorageLocationTypeBridge,
})
if err != nil {
@ -980,7 +981,7 @@ func (h *HttpHandler) DebugDownloadUrls(jc jape.Context) {
return
}
availableNodes := lo.Keys[string, s5interfaces.StorageLocation](locations)
availableNodes := lo.Keys[string, libs5storage.StorageLocation](locations)
availableNodesIds := make([]*encoding.NodeId, len(availableNodes))
@ -1072,7 +1073,7 @@ func (h *HttpHandler) RegistrySet(jc jape.Context) {
return
}
entry := s5protocol.NewSignedRegistryEntry(pk, request.Revision, data, signature)
entry := libs5protocol.NewSignedRegistryEntry(pk, request.Revision, data, signature)
err = h.getNode().Services().Registry().Set(entry, false, nil)
if jc.Check("error setting registry entry", err) != nil {
@ -1140,7 +1141,7 @@ func (h *HttpHandler) RegistrySubscription(jc jape.Context) {
break
}
off, err := h.getNode().Services().Registry().Listen(sre, func(entry s5interfaces.SignedRegistryEntry) {
off, err := h.getNode().Services().Registry().Listen(sre, func(entry libs5protocol.SignedRegistryEntry) {
encoded, err := msgpack.Marshal(entry)
if err != nil {
h.logger.Error("error encoding entry", zap.Error(err))
@ -1162,7 +1163,7 @@ func (h *HttpHandler) RegistrySubscription(jc jape.Context) {
}
}
func (h *HttpHandler) getNode() s5interfaces.Node {
func (h *HttpHandler) getNode() *libs5node.Node {
return h.protocol.Node()
}
@ -1180,7 +1181,7 @@ func (h *HttpHandler) DownloadBlob(jc jape.Context) {
return
}
dlUriProvider := s5storage.NewStorageLocationProvider(h.getNode(), &cidDecoded.Hash, types.StorageLocationTypeFull, types.StorageLocationTypeFile, types.StorageLocationTypeBridge)
dlUriProvider := h.newStorageLocationProvider(&cidDecoded.Hash, types.StorageLocationTypeFull, types.StorageLocationTypeFile, types.StorageLocationTypeBridge)
err = dlUriProvider.Start()
@ -1234,7 +1235,7 @@ func (h *HttpHandler) DebugStorageLocations(jc jape.Context) {
}
}
dlUriProvider := s5storage.NewStorageLocationProvider(h.getNode(), decodedHash, types.StorageLocationTypeFull, types.StorageLocationTypeFile, types.StorageLocationTypeBridge)
dlUriProvider := h.newStorageLocationProvider(decodedHash, typeIntList...)
err = dlUriProvider.Start()
if jc.Check("error starting search", err) != nil {
@ -1246,12 +1247,12 @@ func (h *HttpHandler) DebugStorageLocations(jc jape.Context) {
return
}
locations, err := h.getNode().GetCachedStorageLocations(decodedHash, typeIntList)
locations, err := h.getNode().Services().Storage().GetCachedStorageLocations(decodedHash, typeIntList)
if jc.Check("error getting cached locations", err) != nil {
return
}
availableNodes := lo.Keys[string, s5interfaces.StorageLocation](locations)
availableNodes := lo.Keys[string, libs5storage.StorageLocation](locations)
availableNodesIds := make([]*encoding.NodeId, len(availableNodes))
for i, nodeIdStr := range availableNodes {
@ -1319,7 +1320,7 @@ func (h *HttpHandler) DownloadMetadata(jc jape.Context) {
return
}
meta, err := h.getNode().GetMetadataByCID(cidDecoded)
meta, err := h.getNode().Services().Storage().GetMetadataByCID(cidDecoded)
if jc.Check("error getting metadata", err) != nil {
h.logger.Error("error getting metadata", zap.Error(err))
@ -1379,6 +1380,19 @@ func (h *HttpHandler) DownloadFile(jc jape.Context) {
http.ServeContent(jc.ResponseWriter, jc.Request, file.Name(), file.Modtime(), file)
}
func (h *HttpHandler) newStorageLocationProvider(hash *encoding.Multihash, types ...types.StorageLocationType) libs5storage.StorageLocationProvider {
return libs5storage.NewStorageLocationProvider(libs5storage.StorageLocationProviderParams{
Services: h.getNode().Services(),
Hash: hash,
LocationTypes: types,
ServiceParams: libs5service.ServiceParams{
Logger: h.logger,
Config: h.getNode().Config(),
Db: h.getNode().Db(),
},
})
}
func setAuthCookie(jwt string, jc jape.Context) {
authCookie := http.Cookie{
Name: "s5-auth-token",

View File

@ -1,4 +1,4 @@
package api
package s5
import (
"context"
@ -7,9 +7,8 @@ import (
"git.lumeweb.com/LumeWeb/portal/account"
"git.lumeweb.com/LumeWeb/portal/api/middleware"
"git.lumeweb.com/LumeWeb/portal/api/registry"
"git.lumeweb.com/LumeWeb/portal/api/s5"
"git.lumeweb.com/LumeWeb/portal/protocols"
protoRegistry "git.lumeweb.com/LumeWeb/portal/protocols/registry"
"git.lumeweb.com/LumeWeb/portal/protocols/s5"
"git.lumeweb.com/LumeWeb/portal/storage"
"github.com/rs/cors"
"github.com/spf13/viper"
@ -29,18 +28,18 @@ type S5API struct {
accounts *account.AccountServiceImpl
storage *storage.StorageServiceImpl
protocols []protoRegistry.Protocol
httpHandler s5.HttpHandler
protocol *protocols.S5Protocol
httpHandler HttpHandler
protocol *s5.S5Protocol
}
type S5ApiParams struct {
type APIParams struct {
fx.In
Config *viper.Viper
Identity ed25519.PrivateKey
Accounts *account.AccountServiceImpl
Storage *storage.StorageServiceImpl
Protocols []protoRegistry.Protocol `group:"protocol"`
HttpHandler s5.HttpHandler
HttpHandler HttpHandler
}
type S5ApiResult struct {
@ -49,7 +48,7 @@ type S5ApiResult struct {
S5API *S5API
}
func NewS5(params S5ApiParams) (S5ApiResult, error) {
func NewS5(params APIParams) (S5ApiResult, error) {
api := &S5API{
config: params.Config,
identity: params.Identity,
@ -64,13 +63,13 @@ func NewS5(params S5ApiParams) (S5ApiResult, error) {
}, nil
}
func InitS5Api(api *S5API) error {
func InitAPI(api *S5API) error {
return api.Init()
}
var S5Module = fx.Module("s5_api",
var Module = fx.Module("s5_api",
fx.Provide(NewS5),
fx.Provide(s5.NewHttpHandler),
fx.Provide(NewHttpHandler),
)
func (s *S5API) Init() error {
@ -79,7 +78,7 @@ func (s *S5API) Init() error {
return fmt.Errorf("s5 protocol not found")
}
s5protocolInstance := s5protocol.(*protocols.S5Protocol)
s5protocolInstance := s5protocol.(*s5.S5Protocol)
s.protocol = s5protocolInstance
router := s5protocolInstance.Node().Services().HTTP().GetHttpRouter(getRoutes(s))
middleware.RegisterProtocolSubdomain(s.config, router, "s5")

View File

@ -3,6 +3,7 @@ package protocols
import (
"context"
"git.lumeweb.com/LumeWeb/portal/protocols/registry"
"git.lumeweb.com/LumeWeb/portal/protocols/s5"
"github.com/samber/lo"
"github.com/spf13/viper"
"go.uber.org/fx"
@ -11,8 +12,8 @@ import (
func RegisterProtocols() {
registry.Register(registry.ProtocolEntry{
Key: "s5",
Module: S5ProtocolModule,
InitFunc: InitS5Protocol,
Module: s5.ProtocolModule,
InitFunc: s5.InitProtocol,
})
}

View File

@ -9,7 +9,7 @@ const GroupName = "protocols"
type Protocol interface {
Name() string
Init() error
Init(...any) error
Start(ctx context.Context) error
Stop(ctx context.Context) error
}

View File

@ -1,4 +1,4 @@
package protocols
package s5
import (
"context"
@ -7,7 +7,7 @@ import (
s5config "git.lumeweb.com/LumeWeb/libs5-go/config"
s5ed "git.lumeweb.com/LumeWeb/libs5-go/ed25519"
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
s5interfaces "git.lumeweb.com/LumeWeb/libs5-go/interfaces"
s5fx "git.lumeweb.com/LumeWeb/libs5-go/fx"
s5node "git.lumeweb.com/LumeWeb/libs5-go/node"
s5storage "git.lumeweb.com/LumeWeb/libs5-go/storage"
"git.lumeweb.com/LumeWeb/libs5-go/types"
@ -21,17 +21,16 @@ import (
)
var (
_ s5interfaces.ProviderStore = (*S5ProviderStore)(nil)
_ registry.Protocol = (*S5Protocol)(nil)
_ s5storage.ProviderStore = (*S5ProviderStore)(nil)
_ registry.Protocol = (*S5Protocol)(nil)
)
type S5Protocol struct {
node s5interfaces.Node
config *viper.Viper
logger *zap.Logger
storage *storage.StorageServiceImpl
identity ed25519.PrivateKey
providerStore *S5ProviderStore
config *viper.Viper
logger *zap.Logger
storage *storage.StorageServiceImpl
identity ed25519.PrivateKey
node *s5node.Node
}
type S5ProtocolParams struct {
@ -45,32 +44,93 @@ type S5ProtocolParams struct {
type S5ProtocolResult struct {
fx.Out
Protocol registry.Protocol `group:"protocol"`
S5Protocol *S5Protocol
Protocol registry.Protocol `group:"protocol"`
S5Protocol *S5Protocol
S5NodeConfig *s5config.NodeConfig
}
var S5ProtocolModule = fx.Module("s5_api",
var ProtocolModule = fx.Module("s5_api",
fx.Provide(NewS5Protocol),
fx.Provide(NewS5ProviderStore),
s5fx.Module,
)
func NewS5Protocol(
params S5ProtocolParams,
) (S5ProtocolResult, error) {
proto := &S5Protocol{
config: params.Config,
logger: params.Logger,
storage: params.Storage,
identity: params.Identity,
providerStore: params.ProviderStore,
config: params.Config,
logger: params.Logger,
storage: params.Storage,
identity: params.Identity,
}
cfg, err := ConfigureS5Protocol(params)
if err != nil {
return S5ProtocolResult{}, err
}
return S5ProtocolResult{
Protocol: proto,
S5Protocol: proto,
Protocol: proto,
S5Protocol: proto,
S5NodeConfig: cfg,
}, nil
}
func ConfigureS5Protocol(params S5ProtocolParams) (*s5config.NodeConfig, error) {
cfg := &s5config.NodeConfig{
P2P: s5config.P2PConfig{
Network: "",
Peers: s5config.PeersConfig{Initial: []string{}},
},
KeyPair: s5ed.New(params.Identity),
DB: nil,
Logger: params.Logger.Named("s5"),
HTTP: s5config.HTTPConfig{},
}
pconfig := params.Config.Sub("protocol.s5")
if pconfig == nil {
params.Logger.Fatal("Missing protocol.s5 Config")
}
err := pconfig.Unmarshal(cfg)
if err != nil {
return nil, err
}
cfg.HTTP.API.Domain = fmt.Sprintf("s5.%s", params.Config.GetString("core.domain"))
if params.Config.IsSet("core.externalPort") {
cfg.HTTP.API.Port = params.Config.GetUint("core.externalPort")
} else {
cfg.HTTP.API.Port = params.Config.GetUint("core.port")
}
dbPath := pconfig.GetString("dbPath")
if dbPath == "" {
params.Logger.Fatal("protocol.s5.dbPath is required")
}
_, p, err := ed25519.GenerateKey(nil)
if err != nil {
params.Logger.Fatal("Failed to generate key", zap.Error(err))
}
cfg.KeyPair = s5ed.New(p)
db, err := bolt.Open(dbPath, 0600, nil)
if err != nil {
params.Logger.Fatal("Failed to open db", zap.Error(err))
}
cfg.DB = db
return cfg, nil
}
func NewS5ProviderStore(config *viper.Viper, logger *zap.Logger, storage *storage.StorageServiceImpl) *S5ProviderStore {
return &S5ProviderStore{
config: config,
@ -79,64 +139,22 @@ func NewS5ProviderStore(config *viper.Viper, logger *zap.Logger, storage *storag
}
}
func InitS5Protocol(s5 *S5Protocol) error {
return s5.Init()
func InitProtocol(s5 *S5Protocol, node *s5node.Node, store *S5ProviderStore) error {
return s5.Init(node, store)
}
func (s *S5Protocol) Init() error {
cfg := &s5config.NodeConfig{
P2P: s5config.P2PConfig{
Network: "",
Peers: s5config.PeersConfig{Initial: []string{}},
},
KeyPair: s5ed.New(s.identity),
DB: nil,
Logger: s.logger.Named("s5"),
HTTP: s5config.HTTPConfig{},
}
pconfig := s.config.Sub("protocol.s5")
if pconfig == nil {
s.logger.Fatal("Missing protocol.s5 Config")
}
err := pconfig.Unmarshal(cfg)
if err != nil {
return err
}
cfg.HTTP.API.Domain = fmt.Sprintf("s5.%s", s.config.GetString("core.domain"))
if s.config.IsSet("core.externalPort") {
cfg.HTTP.API.Port = s.config.GetUint("core.externalPort")
func (s *S5Protocol) Init(args ...any) error {
if node, ok := args[0].(*s5node.Node); !ok {
s.logger.Fatal("Node is not a s5 node")
} else {
cfg.HTTP.API.Port = s.config.GetUint("core.port")
s.node = node
}
dbPath := pconfig.GetString("dbPath")
if dbPath == "" {
s.logger.Fatal("protocol.s5.dbPath is required")
if store, ok := args[1].(*S5ProviderStore); !ok {
s.logger.Fatal("Store is not a s5 store")
} else {
s.node.Services().Storage().SetProviderStore(store)
}
_, p, err := ed25519.GenerateKey(nil)
if err != nil {
s.logger.Fatal("Failed to generate key", zap.Error(err))
}
cfg.KeyPair = s5ed.New(p)
db, err := bolt.Open(dbPath, 0600, nil)
if err != nil {
s.logger.Fatal("Failed to open db", zap.Error(err))
}
cfg.DB = db
s.node = s5node.NewNode(cfg)
s.node.SetProviderStore(s.providerStore)
return nil
}
func (s *S5Protocol) Start(ctx context.Context) error {
@ -145,7 +163,7 @@ func (s *S5Protocol) Start(ctx context.Context) error {
return err
}
identity, err := s.node.Services().P2P().NodeId().ToString()
identity, err := s.node.NodeId().ToString()
if err != nil {
return err
@ -160,12 +178,12 @@ func (s *S5Protocol) Name() string {
return "s5"
}
func (s *S5Protocol) Stop(ctx context.Context) error {
return nil
func (s *S5Protocol) Node() *s5node.Node {
return s.node
}
func (s *S5Protocol) Node() s5interfaces.Node {
return s.node
func (s *S5Protocol) Stop(ctx context.Context) error {
return nil
}
type S5ProviderStore struct {
@ -194,7 +212,7 @@ func (s S5ProviderStore) CanProvide(hash *encoding.Multihash, kind []types.Stora
return false
}
func (s S5ProviderStore) Provide(hash *encoding.Multihash, kind []types.StorageLocationType) (s5interfaces.StorageLocation, error) {
func (s S5ProviderStore) Provide(hash *encoding.Multihash, kind []types.StorageLocationType) (s5storage.StorageLocation, error) {
for _, t := range kind {
if !s.CanProvide(hash, []types.StorageLocationType{t}) {
continue