diff --git a/go.mod b/go.mod index c045be9..f9e3cc0 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/vmihailenco/msgpack/v5 v5.4.1 go.etcd.io/bbolt v1.3.8 go.sia.tech/jape v0.11.1 + go.uber.org/fx v1.20.1 go.uber.org/zap v1.26.0 nhooyr.io/websocket v1.7.1 ) @@ -26,6 +27,7 @@ require ( github.com/multiformats/go-base36 v0.1.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect + go.uber.org/dig v1.17.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect golang.org/x/net v0.17.0 // indirect diff --git a/node/node.go b/node/node.go index 43beb4e..c8d8d10 100644 --- a/node/node.go +++ b/node/node.go @@ -17,7 +17,6 @@ import ( "github.com/vmihailenco/msgpack/v5" bolt "go.etcd.io/bbolt" "go.uber.org/zap" - "sync" "time" ) @@ -26,11 +25,9 @@ const cacheBucketName = "object-cache" type Node struct { nodeConfig *config.NodeConfig metadataCache structs.Map - started bool hashQueryRoutingTable structs.Map services service.Services httpClient *resty.Client - connections sync.WaitGroup providerStore storage.ProviderStore } @@ -42,24 +39,21 @@ func (n *Node) Services() service.Services { return n.services } -func NewNode(config *config.NodeConfig) *Node { - n := &Node{ +func NewNode(config *config.NodeConfig, services service.Services) *Node { + return &Node{ nodeConfig: config, metadataCache: structs.NewMap(), - started: false, hashQueryRoutingTable: structs.NewMap(), httpClient: resty.New(), + services: services, // Services are passed in, not created here } - n.services = NewServices(service.NewP2P(n), service.NewRegistry(n), service.NewHTTP(n)) - - return n } func (n *Node) HashQueryRoutingTable() structs.Map { return n.hashQueryRoutingTable } func (n *Node) IsStarted() bool { - return n.started + return n.services.IsStarted() } func (n *Node) Config() *config.NodeConfig { @@ -81,8 +75,8 @@ func (n *Node) Db() *bolt.DB { } func (n *Node) Start() error { - protocol.Init() - signed.Init() + protocol.RegisterProtocols() + signed.RegisterSignedProtocols() err := utils.CreateBucket(cacheBucketName, n.Db()) @@ -90,22 +84,13 @@ func (n *Node) Start() error { return err } - n.started = true - - for _, svc := range n.services.All() { - err := svc.Init() - if err != nil { - return err - } - - err = svc.Start() - if err != nil { - return err - } - } - n.started = true - return nil + return n.services.Start() } + +func (n *Node) Stop() error { + return n.services.Stop() +} + func (n *Node) GetCachedStorageLocations(hash *encoding.Multihash, kinds []types.StorageLocationType) (map[string]storage.StorageLocation, error) { locations := make(map[string]storage.StorageLocation) @@ -325,11 +310,7 @@ func (n *Node) GetMetadataByCID(cid *encoding.CID) (md metadata.Metadata, err er return md, nil } func (n *Node) WaitOnConnectedPeers() { - n.connections.Wait() -} - -func (n *Node) ConnectionTracker() *sync.WaitGroup { - return &n.connections + n.services.P2P().WaitOnConnectedPeers() } func (n *Node) SetProviderStore(store storage.ProviderStore) { @@ -339,3 +320,26 @@ func (n *Node) SetProviderStore(store storage.ProviderStore) { func (n *Node) ProviderStore() storage.ProviderStore { return n.providerStore } + +func DefaultNode(config *config.NodeConfig) *Node { + params := service.ServiceParams{ + Logger: config.Logger, + Config: config, + Db: config.DB, + } + + // Initialize services first + p2pService := service.NewP2P(params) + registryService := service.NewRegistry(params) + httpService := service.NewHTTP(params) + + // Aggregate services + services := NewServices(ServicesParams{ + P2P: p2pService, + Registry: registryService, + HTTP: httpService, + }) + + // Now create the node with the services + return NewNode(config, services) +} diff --git a/node/services.go b/node/services.go index df8eebc..e1492d1 100644 --- a/node/services.go +++ b/node/services.go @@ -8,10 +8,17 @@ var ( _ service.Services = (*ServicesImpl)(nil) ) +type ServicesParams struct { + P2P *service.P2PService + Registry *service.RegistryService + HTTP *service.HTTPService +} + type ServicesImpl struct { p2p *service.P2PService registry *service.RegistryService http *service.HTTPService + started bool } func (s *ServicesImpl) HTTP() *service.HTTPService { @@ -31,14 +38,50 @@ func (s *ServicesImpl) Registry() *service.RegistryService { return s.registry } -func NewServices(p2p *service.P2PService, registry *service.RegistryService, http *service.HTTPService) service.Services { - return &ServicesImpl{ - p2p: p2p, - registry: registry, - http: http, +func NewServices(params ServicesParams) service.Services { + sc := &ServicesImpl{ + p2p: params.P2P, + registry: params.Registry, + http: params.HTTP, + started: false, } + + for _, svc := range sc.All() { + svc.SetServices(sc) + } + + return sc } func (s *ServicesImpl) P2P() *service.P2PService { return s.p2p } + +func (s *ServicesImpl) IsStarted() bool { + return s.started +} + +func (s *ServicesImpl) Start() error { + for _, svc := range s.All() { + err := svc.Start() + if err != nil { + return err + } + } + + s.started = true + + return nil +} +func (s *ServicesImpl) Stop() error { + for _, svc := range s.All() { + err := svc.Stop() + if err != nil { + return err + } + } + + s.started = false + + return nil +} diff --git a/protocol/message.go b/protocol/message.go index 8426e81..38d7e41 100644 --- a/protocol/message.go +++ b/protocol/message.go @@ -10,7 +10,7 @@ var ( messageTypes map[int]func() base.IncomingMessage ) -func Init() { +func RegisterProtocols() { messageTypes = make(map[int]func() base.IncomingMessage) // Register factory functions instead of instances diff --git a/protocol/signed/signed.go b/protocol/signed/signed.go index 5565698..56b9dfa 100644 --- a/protocol/signed/signed.go +++ b/protocol/signed/signed.go @@ -22,7 +22,7 @@ var ( messageTypes map[int]func() IncomingMessageSigned ) -func Init() { +func RegisterSignedProtocols() { messageTypes = make(map[int]func() IncomingMessageSigned) RegisterMessageType(int(types.ProtocolMethodHandshakeDone), func() IncomingMessageSigned { diff --git a/protocol/signed/signed_message.go b/protocol/signed/signed_message.go index 87dd468..9d34eba 100644 --- a/protocol/signed/signed_message.go +++ b/protocol/signed/signed_message.go @@ -3,8 +3,8 @@ package signed import ( "crypto/ed25519" "errors" + "git.lumeweb.com/LumeWeb/libs5-go/config" "git.lumeweb.com/LumeWeb/libs5-go/encoding" - _node "git.lumeweb.com/LumeWeb/libs5-go/node" "git.lumeweb.com/LumeWeb/libs5-go/protocol/base" "git.lumeweb.com/LumeWeb/libs5-go/types" "github.com/vmihailenco/msgpack/v5" @@ -171,7 +171,7 @@ func (s *SignedMessage) EncodeMsgpack(enc *msgpack.Encoder) error { return nil } -func (s *SignedMessage) Sign(node *_node.Node) error { +func (s *SignedMessage) Sign(cfg *config.NodeConfig) error { if s.nodeId == nil { panic("nodeId is nil") } @@ -180,7 +180,7 @@ func (s *SignedMessage) Sign(node *_node.Node) error { panic("message is nil") } - s.signature = ed25519.Sign(node.Config().KeyPair.ExtractBytes(), s.message) + s.signature = ed25519.Sign(cfg.KeyPair.ExtractBytes(), s.message) return nil } diff --git a/service/http.go b/service/http.go index 14aed62..bc154fb 100644 --- a/service/http.go +++ b/service/http.go @@ -3,7 +3,6 @@ package service import ( "git.lumeweb.com/LumeWeb/libs5-go/build" "git.lumeweb.com/LumeWeb/libs5-go/net" - _node "git.lumeweb.com/LumeWeb/libs5-go/node" "github.com/julienschmidt/httprouter" "go.sia.tech/jape" "go.uber.org/zap" @@ -23,12 +22,16 @@ type P2PNodeResponse struct { } type HTTPService struct { - node *_node.Node + ServiceBase } -func NewHTTP(node *_node.Node) *HTTPService { +func NewHTTP(params ServiceParams) *HTTPService { return &HTTPService{ - node: node, + ServiceBase: ServiceBase{ + logger: params.Logger, + config: params.Config, + db: params.Db, + }, } } @@ -46,10 +49,6 @@ func (h *HTTPService) GetHttpRouter(inject map[string]jape.Handler) *httprouter. return jape.Mux(routes) } -func (h *HTTPService) Node() *_node.Node { - return h.node -} - func (h *HTTPService) Start() error { return nil } @@ -68,7 +67,7 @@ func (h *HTTPService) versionHandler(ctx jape.Context) { func (h *HTTPService) p2pHandler(ctx jape.Context) { c, err := websocket.Accept(ctx.ResponseWriter, ctx.Request, nil) if err != nil { - h.node.Logger().Error("error accepting websocket connection", zap.Error(err)) + h.logger.Error("error accepting websocket connection", zap.Error(err)) return } @@ -78,33 +77,33 @@ func (h *HTTPService) p2pHandler(ctx jape.Context) { }) if err != nil { - h.node.Logger().Error("error creating transport peer", zap.Error(err)) + h.logger.Error("error creating transport peer", zap.Error(err)) err := c.Close(websocket.StatusInternalError, "the sky is falling") if err != nil { - h.node.Logger().Error("error closing websocket connection", zap.Error(err)) + h.logger.Error("error closing websocket connection", zap.Error(err)) } return } - h.Node().ConnectionTracker().Add(1) + h.services.P2P().ConnectionTracker().Add(1) go func() { - err := h.node.Services().P2P().OnNewPeer(peer, false) + err := h.services.P2P().OnNewPeer(peer, false) if err != nil { - h.node.Logger().Error("error handling new peer", zap.Error(err)) + h.logger.Error("error handling new peer", zap.Error(err)) } - h.node.ConnectionTracker().Done() + h.services.P2P().ConnectionTracker().Done() }() } func (h *HTTPService) p2pNodesHandler(ctx jape.Context) { - localId, err := h.node.Services().P2P().NodeId().ToString() + localId, err := h.services.P2P().NodeId().ToString() if ctx.Check("error getting local node id", err) != nil { return } - uris := h.node.Services().P2P().SelfConnectionUris() + uris := h.services.P2P().SelfConnectionUris() nodeList := make([]P2PNodeResponse, len(uris)) diff --git a/service/p2p.go b/service/p2p.go index 20244d1..4bb2949 100644 --- a/service/p2p.go +++ b/service/p2p.go @@ -9,7 +9,6 @@ import ( "git.lumeweb.com/LumeWeb/libs5-go/ed25519" "git.lumeweb.com/LumeWeb/libs5-go/encoding" "git.lumeweb.com/LumeWeb/libs5-go/net" - _node "git.lumeweb.com/LumeWeb/libs5-go/node" "git.lumeweb.com/LumeWeb/libs5-go/protocol" "git.lumeweb.com/LumeWeb/libs5-go/protocol/base" "git.lumeweb.com/LumeWeb/libs5-go/protocol/signed" @@ -36,12 +35,10 @@ var ( const nodeBucketName = "nodes" type P2PService struct { - logger *zap.Logger nodeKeyPair *ed25519.KeyPairEd25519 localNodeID *encoding.NodeId networkID string nodesBucket *bolt.Bucket - node *_node.Node inited bool reconnectDelay structs.Map peers structs.Map @@ -52,19 +49,19 @@ type P2PService struct { incomingIPBlocklist structs.Map outgoingPeerFailures structs.Map maxOutgoingPeerFailures uint + connections sync.WaitGroup + ServiceBase } -func NewP2P(node *_node.Node) *P2PService { - uri, err := url.Parse(fmt.Sprintf("wss://%s:%d/s5/p2p", node.Config().HTTP.API.Domain, node.Config().HTTP.API.Port)) +func NewP2P(params ServiceParams) *P2PService { + uri, err := url.Parse(fmt.Sprintf("wss://%s:%d/s5/p2p", params.Config.HTTP.API.Domain, params.Config.HTTP.API.Port)) if err != nil { - node.Logger().Fatal("failed to HTTP API URL Config", zap.Error(err)) + params.Logger.Fatal("failed to parse HTTP API URL", zap.Error(err)) } service := &P2PService{ - logger: node.Logger(), - nodeKeyPair: node.Config().KeyPair, - networkID: node.Config().P2P.Network, - node: node, + nodeKeyPair: params.Config.KeyPair, + networkID: params.Config.P2P.Network, inited: false, reconnectDelay: structs.NewMap(), peers: structs.NewMap(), @@ -74,7 +71,12 @@ func NewP2P(node *_node.Node) *P2PService { incomingPeerBlockList: structs.NewMap(), incomingIPBlocklist: structs.NewMap(), outgoingPeerFailures: structs.NewMap(), - maxOutgoingPeerFailures: node.Config().P2P.MaxOutgoingPeerFailures, + maxOutgoingPeerFailures: params.Config.P2P.MaxOutgoingPeerFailures, + ServiceBase: ServiceBase{ + logger: params.Logger, + config: params.Config, + db: params.Db, + }, } return service @@ -84,16 +86,12 @@ func (p *P2PService) SelfConnectionUris() []*url.URL { return p.selfConnectionUris } -func (p *P2PService) Node() *_node.Node { - return p.node -} - func (p *P2PService) Peers() structs.Map { return p.peers } func (p *P2PService) Start() error { - config := p.Node().Config() + config := p.config if len(config.P2P.Peers.Initial) > 0 { initialPeers := config.P2P.Peers.Initial @@ -117,7 +115,7 @@ func (p *P2PService) Start() error { } func (p *P2PService) Stop() error { - panic("implement me") + return nil } func (p *P2PService) Init() error { @@ -126,7 +124,7 @@ func (p *P2PService) Init() error { } p.localNodeID = encoding.NewNodeId(p.nodeKeyPair.PublicKey()) - err := utils.CreateBucket(nodeBucketName, p.Node().Db()) + err := utils.CreateBucket(nodeBucketName, p.db) if err != nil { return err @@ -137,7 +135,7 @@ func (p *P2PService) Init() error { return nil } func (p *P2PService) ConnectToNode(connectionUris []*url.URL, retried bool, fromPeer net.Peer) error { - if !p.Node().IsStarted() { + if !p.services.IsStarted() { return nil } @@ -322,7 +320,7 @@ func (p *P2PService) ConnectToNode(connectionUris []*url.URL, retried bool, from peer.SetId(id) - p.Node().ConnectionTracker().Add(1) + p.services.P2P().ConnectionTracker().Add(1) peerId, err := peer.Id().ToString() if err != nil { @@ -335,7 +333,7 @@ func (p *P2PService) ConnectToNode(connectionUris []*url.URL, retried bool, from if err != nil && !peer.Abuser() { p.logger.Error("peer error", zap.Error(err)) } - p.Node().ConnectionTracker().Done() + p.services.P2P().ConnectionTracker().Done() }() return nil @@ -443,7 +441,6 @@ func (p *P2PService) OnNewPeerListen(peer net.Peer, verifyId bool) { Original: message, Data: reader.Data, Ctx: context.Background(), - Node: p.node, Peer: peer, VerifyId: verifyId, } @@ -473,7 +470,7 @@ func (p *P2PService) OnNewPeerListen(peer net.Peer, verifyId bool) { func (p *P2PService) readNodeVotes(nodeId *encoding.NodeId) (NodeVotes, error) { var value []byte var found bool - err := p.node.Db().View(func(tx *bolt.Tx) error { + err := p.db.View(func(tx *bolt.Tx) error { b := tx.Bucket([]byte(nodeBucketName)) if b == nil { return fmt.Errorf("Bucket %s not found", nodeBucketName) @@ -509,7 +506,7 @@ func (p *P2PService) saveNodeVotes(nodeId *encoding.NodeId, votes NodeVotes) err } // Use a transaction to save the data - err = p.node.Db().Update(func(tx *bolt.Tx) error { + err = p.db.Update(func(tx *bolt.Tx) error { // Get or create the bucket b := tx.Bucket([]byte(nodeBucketName)) @@ -557,7 +554,7 @@ func (p *P2PService) SignMessageSimple(message []byte) ([]byte, error) { signedMessage := signed.NewSignedMessageRequest(message) signedMessage.SetNodeId(p.localNodeID) - err := signedMessage.Sign(p.Node()) + err := signedMessage.Sign(p.config) if err != nil { return nil, err @@ -615,7 +612,7 @@ func (p *P2PService) SendHashRequest(hash *encoding.Multihash, kinds []types.Sto for _, peer := range p.peers.Values() { peerValue, ok := peer.(net.Peer) if !ok { - p.node.Logger().Error("failed to cast peer to net.Peer") + p.logger.Error("failed to cast peer to net.Peer") continue } err = peerValue.SendMessage(message) @@ -706,3 +703,11 @@ func (p *P2PService) PrepareProvideMessage(hash *encoding.Multihash, location st // Return the final byte slice. return finalList } + +func (p *P2PService) WaitOnConnectedPeers() { + p.connections.Wait() +} + +func (p *P2PService) ConnectionTracker() *sync.WaitGroup { + return &p.connections +} diff --git a/service/registry.go b/service/registry.go index 7d7727f..4d51c82 100644 --- a/service/registry.go +++ b/service/registry.go @@ -4,7 +4,6 @@ import ( "errors" "git.lumeweb.com/LumeWeb/libs5-go/encoding" "git.lumeweb.com/LumeWeb/libs5-go/net" - _node "git.lumeweb.com/LumeWeb/libs5-go/node" "git.lumeweb.com/LumeWeb/libs5-go/protocol" "git.lumeweb.com/LumeWeb/libs5-go/structs" "git.lumeweb.com/LumeWeb/libs5-go/types" @@ -23,14 +22,9 @@ var ( ) type RegistryService struct { - node *_node.Node - logger *zap.Logger streams structs.Map subs structs.Map -} - -func (r *RegistryService) Node() *_node.Node { - return r.node + ServiceBase } func (r *RegistryService) Start() error { @@ -42,15 +36,18 @@ func (r *RegistryService) Stop() error { } func (r *RegistryService) Init() error { - return utils.CreateBucket(registryBucketName, r.node.Db()) + return utils.CreateBucket(registryBucketName, r.db) } -func NewRegistry(node *_node.Node) *RegistryService { +func NewRegistry(params ServiceParams) *RegistryService { return &RegistryService{ - node: node, - logger: node.Logger(), streams: structs.NewMap(), subs: structs.NewMap(), + ServiceBase: ServiceBase{ + logger: params.Logger, + config: params.Config, + db: params.Db, + }, } } func (r *RegistryService) Set(sre protocol.SignedRegistryEntry, trusted bool, receivedFrom net.Peer) error { @@ -120,7 +117,7 @@ func (r *RegistryService) Set(sre protocol.SignedRegistryEntry, trusted bool, re go event.Emit("fire", sre) } - err = r.node.Db().Update(func(txn *bbolt.Tx) error { + err = r.db.Update(func(txn *bbolt.Tx) error { bucket := txn.Bucket([]byte(registryBucketName)) err := bucket.Put(sre.PK(), protocol.MarshalSignedRegistryEntry(sre)) if err != nil { @@ -153,7 +150,7 @@ func (r *RegistryService) BroadcastEntry(sre protocol.SignedRegistryEntry, recei r.logger.Debug("[registry] broadcastEntry", zap.String("pk", hashString), zap.Uint64("revision", sre.Revision()), zap.String("receivedFrom", pid)) updateMessage := protocol.MarshalSignedRegistryEntry(sre) - for _, p := range r.node.Services().P2P().Peers().Values() { + for _, p := range r.services.P2P().Peers().Values() { peer, ok := p.(net.Peer) if !ok { continue @@ -182,7 +179,7 @@ func (r *RegistryService) SendRegistryRequest(pk []byte) error { } // Iterate over peers and send the request - for _, peerVal := range r.node.Services().P2P().Peers().Values() { + for _, peerVal := range r.services.P2P().Peers().Values() { peer, ok := peerVal.(net.Peer) if !ok { continue @@ -295,7 +292,7 @@ func (r *RegistryService) Listen(pk []byte, cb func(sre protocol.SignedRegistryE } func (r *RegistryService) getFromDB(pk []byte) (sre protocol.SignedRegistryEntry, err error) { - err = r.node.Db().View(func(txn *bbolt.Tx) error { + err = r.db.View(func(txn *bbolt.Tx) error { bucket := txn.Bucket([]byte(registryBucketName)) val := bucket.Get(pk) if val != nil { diff --git a/service/service.go b/service/service.go index 2781539..128adce 100644 --- a/service/service.go +++ b/service/service.go @@ -1,16 +1,39 @@ package service -import "git.lumeweb.com/LumeWeb/libs5-go/node" +import ( + "git.lumeweb.com/LumeWeb/libs5-go/config" + bolt "go.etcd.io/bbolt" + "go.uber.org/zap" +) type Service interface { - Node() *node.Node Start() error Stop() error Init() error + SetServices(services Services) } type Services interface { P2P() *P2PService Registry() *RegistryService HTTP() *HTTPService All() []Service + IsStarted() bool + Start() error + Stop() error +} +type ServiceParams struct { + Logger *zap.Logger + Config *config.NodeConfig + Db *bolt.DB +} + +type ServiceBase struct { + logger *zap.Logger + config *config.NodeConfig + db *bolt.DB + services Services +} + +func (s *ServiceBase) SetServices(services Services) { + s.services = services }