From a0dcc52d63384dfc0219b0e1d54d52ac976e9114 Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Sun, 28 Jan 2024 23:59:43 -0500 Subject: [PATCH] refactor: remove dedicated interfaces and minimize interfaces --- interfaces/http.go | 13 -------- interfaces/meta.go | 7 ---- interfaces/node.go | 34 ------------------- interfaces/p2p.go | 30 ----------------- interfaces/provider_store.go | 11 ------- interfaces/registry.go | 30 ----------------- interfaces/service.go | 16 --------- interfaces/storage.go | 33 ------------------- interfaces/vote.go | 14 -------- node/node.go | 53 ++++++++++++++---------------- node/services.go | 24 +++++++------- protocol/base/base.go | 2 +- protocol/registry.go | 35 ++++++++++++++------ protocol/registry_entry.go | 5 ++- service/http.go | 28 ++++++++-------- service/p2p.go | 63 ++++++++++++++++++------------------ service/registry.go | 38 +++++++++++----------- service/service.go | 16 +++++++++ service/vote.go | 14 ++++++-- storage/storage.go | 63 +++++++++++++++++++++++++++--------- 20 files changed, 206 insertions(+), 323 deletions(-) delete mode 100644 interfaces/http.go delete mode 100644 interfaces/meta.go delete mode 100644 interfaces/node.go delete mode 100644 interfaces/p2p.go delete mode 100644 interfaces/provider_store.go delete mode 100644 interfaces/registry.go delete mode 100644 interfaces/service.go delete mode 100644 interfaces/storage.go delete mode 100644 interfaces/vote.go create mode 100644 service/service.go diff --git a/interfaces/http.go b/interfaces/http.go deleted file mode 100644 index d13ffef..0000000 --- a/interfaces/http.go +++ /dev/null @@ -1,13 +0,0 @@ -package interfaces - -import ( - "github.com/julienschmidt/httprouter" - "go.sia.tech/jape" -) - -//go:generate mockgen -source=http.go -destination=../mocks/interfaces/http.go -package=interfaces - -type HTTPService interface { - Service - GetHttpRouter(inject map[string]jape.Handler) *httprouter.Router -} diff --git a/interfaces/meta.go b/interfaces/meta.go deleted file mode 100644 index 38253ab..0000000 --- a/interfaces/meta.go +++ /dev/null @@ -1,7 +0,0 @@ -package interfaces - -//go:generate mockgen -source=meta.go -destination=../mocks/interfaces/meta.go -package=interfaces - -type Metadata interface { - ToJson() map[string]interface{} -} diff --git a/interfaces/node.go b/interfaces/node.go deleted file mode 100644 index d5dfdf6..0000000 --- a/interfaces/node.go +++ /dev/null @@ -1,34 +0,0 @@ -package interfaces - -import ( - "git.lumeweb.com/LumeWeb/libs5-go/config" - "git.lumeweb.com/LumeWeb/libs5-go/encoding" - "git.lumeweb.com/LumeWeb/libs5-go/metadata" - "git.lumeweb.com/LumeWeb/libs5-go/structs" - "git.lumeweb.com/LumeWeb/libs5-go/types" - bolt "go.etcd.io/bbolt" - "go.uber.org/zap" - "sync" -) - -//go:generate mockgen -source=node.go -destination=../mocks/interfaces/node.go -package=interfaces - -type Node interface { - Services() Services - HashQueryRoutingTable() structs.Map - IsStarted() bool - Config() *config.NodeConfig - Logger() *zap.Logger - Db() *bolt.DB - Start() error - GetCachedStorageLocations(hash *encoding.Multihash, kinds []types.StorageLocationType) (map[string]StorageLocation, error) - AddStorageLocation(hash *encoding.Multihash, nodeId *encoding.NodeId, location StorageLocation, message []byte) error - NetworkId() string - DownloadBytesByHash(hash *encoding.Multihash) ([]byte, error) - DownloadBytesByCID(cid *encoding.CID) (bytes []byte, err error) - GetMetadataByCID(cid *encoding.CID) (metadata.Metadata, error) - WaitOnConnectedPeers() - ConnectionTracker() *sync.WaitGroup - SetProviderStore(store ProviderStore) - ProviderStore() ProviderStore -} diff --git a/interfaces/p2p.go b/interfaces/p2p.go deleted file mode 100644 index 9568e77..0000000 --- a/interfaces/p2p.go +++ /dev/null @@ -1,30 +0,0 @@ -package interfaces - -import ( - "git.lumeweb.com/LumeWeb/libs5-go/encoding" - "git.lumeweb.com/LumeWeb/libs5-go/net" - "git.lumeweb.com/LumeWeb/libs5-go/structs" - "git.lumeweb.com/LumeWeb/libs5-go/types" - "net/url" -) - -//go:generate mockgen -source=p2p.go -destination=../mocks/interfaces/p2p.go -package=interfaces - -type P2PService interface { - Peers() structs.Map - ConnectToNode(connectionUris []*url.URL, retried bool, fromPeer net.Peer) error - OnNewPeer(peer net.Peer, verifyId bool) error - OnNewPeerListen(peer net.Peer, verifyId bool) - GetNodeScore(nodeId *encoding.NodeId) (float64, error) - SortNodesByScore(nodes []*encoding.NodeId) ([]*encoding.NodeId, error) - SignMessageSimple(message []byte) ([]byte, error) - AddPeer(peer net.Peer) error - SendPublicPeersToPeer(peer net.Peer, peersToSend []net.Peer) error - SendHashRequest(hash *encoding.Multihash, kinds []types.StorageLocationType) error - UpVote(nodeId *encoding.NodeId) error - DownVote(nodeId *encoding.NodeId) error - NodeId() *encoding.NodeId - SelfConnectionUris() []*url.URL - PrepareProvideMessage(hash *encoding.Multihash, location StorageLocation) []byte - Service -} diff --git a/interfaces/provider_store.go b/interfaces/provider_store.go deleted file mode 100644 index 315dc21..0000000 --- a/interfaces/provider_store.go +++ /dev/null @@ -1,11 +0,0 @@ -package interfaces - -import ( - "git.lumeweb.com/LumeWeb/libs5-go/encoding" - "git.lumeweb.com/LumeWeb/libs5-go/types" -) - -type ProviderStore interface { - CanProvide(hash *encoding.Multihash, kind []types.StorageLocationType) bool - Provide(hash *encoding.Multihash, kind []types.StorageLocationType) (StorageLocation, error) -} diff --git a/interfaces/registry.go b/interfaces/registry.go deleted file mode 100644 index 4293750..0000000 --- a/interfaces/registry.go +++ /dev/null @@ -1,30 +0,0 @@ -package interfaces - -import "git.lumeweb.com/LumeWeb/libs5-go/net" - -//go:generate mockgen -source=registry.go -destination=../mocks/interfaces/registry.go -package=interfaces - -type SignedRegistryEntry interface { - PK() []byte - Revision() uint64 - Data() []byte - Signature() []byte - SetPK(pk []byte) - SetRevision(revision uint64) - SetData(data []byte) - SetSignature(signature []byte) - Verify() bool -} - -type RegistryEntry interface { - Sign() SignedRegistryEntry -} - -type RegistryService interface { - Set(sre SignedRegistryEntry, trusted bool, receivedFrom net.Peer) error - Get(pk []byte) (SignedRegistryEntry, error) - BroadcastEntry(sre SignedRegistryEntry, receivedFrom net.Peer) error - SendRegistryRequest(pk []byte) error - Listen(pk []byte, cb func(sre SignedRegistryEntry)) (func(), error) - Service -} diff --git a/interfaces/service.go b/interfaces/service.go deleted file mode 100644 index 49138f0..0000000 --- a/interfaces/service.go +++ /dev/null @@ -1,16 +0,0 @@ -package interfaces - -//go:generate mockgen -source=service.go -destination=../mocks/interfaces/service.go -package=interfaces - -type Service interface { - Node() Node - Start() error - Stop() error - Init() error -} -type Services interface { - P2P() P2PService - Registry() RegistryService - HTTP() HTTPService - All() []Service -} diff --git a/interfaces/storage.go b/interfaces/storage.go deleted file mode 100644 index f178f1c..0000000 --- a/interfaces/storage.go +++ /dev/null @@ -1,33 +0,0 @@ -package interfaces - -import "git.lumeweb.com/LumeWeb/libs5-go/encoding" - -//go:generate mockgen -source=storage.go -destination=../mocks/interfaces/storage.go -package=interfaces - -type StorageLocationProvider interface { - Start() error - Next() (SignedStorageLocation, error) - Upvote(uri SignedStorageLocation) error - Downvote(uri SignedStorageLocation) error -} - -type StorageLocation interface { - BytesURL() string - OutboardBytesURL() string - String() string - ProviderMessage() []byte - Type() int - Parts() []string - BinaryParts() [][]byte - Expiry() int64 - SetProviderMessage(msg []byte) - SetType(t int) - SetParts(p []string) - SetBinaryParts(bp [][]byte) - SetExpiry(e int64) -} -type SignedStorageLocation interface { - String() string - NodeId() *encoding.NodeId - Location() StorageLocation -} diff --git a/interfaces/vote.go b/interfaces/vote.go deleted file mode 100644 index 7f6b3e7..0000000 --- a/interfaces/vote.go +++ /dev/null @@ -1,14 +0,0 @@ -package interfaces - -import "github.com/vmihailenco/msgpack/v5" - -//go:generate mockgen -source=vote.go -destination=../mocks/interfaces/vote.go -package=interfaces - -type NodeVotes interface { - msgpack.CustomEncoder - msgpack.CustomDecoder - Good() int - Bad() int - Upvote() - Downvote() -} diff --git a/node/node.go b/node/node.go index 6a3f893..43beb4e 100644 --- a/node/node.go +++ b/node/node.go @@ -5,7 +5,6 @@ import ( "fmt" "git.lumeweb.com/LumeWeb/libs5-go/config" "git.lumeweb.com/LumeWeb/libs5-go/encoding" - "git.lumeweb.com/LumeWeb/libs5-go/interfaces" "git.lumeweb.com/LumeWeb/libs5-go/metadata" "git.lumeweb.com/LumeWeb/libs5-go/protocol" "git.lumeweb.com/LumeWeb/libs5-go/protocol/signed" @@ -22,31 +21,29 @@ import ( "time" ) -var _ interfaces.Node = (*NodeImpl)(nil) - const cacheBucketName = "object-cache" -type NodeImpl struct { +type Node struct { nodeConfig *config.NodeConfig metadataCache structs.Map started bool hashQueryRoutingTable structs.Map - services interfaces.Services + services service.Services httpClient *resty.Client connections sync.WaitGroup - providerStore interfaces.ProviderStore + providerStore storage.ProviderStore } -func (n *NodeImpl) NetworkId() string { +func (n *Node) NetworkId() string { return n.nodeConfig.P2P.Network } -func (n *NodeImpl) Services() interfaces.Services { +func (n *Node) Services() service.Services { return n.services } -func NewNode(config *config.NodeConfig) interfaces.Node { - n := &NodeImpl{ +func NewNode(config *config.NodeConfig) *Node { + n := &Node{ nodeConfig: config, metadataCache: structs.NewMap(), started: false, @@ -57,33 +54,33 @@ func NewNode(config *config.NodeConfig) interfaces.Node { return n } -func (n *NodeImpl) HashQueryRoutingTable() structs.Map { +func (n *Node) HashQueryRoutingTable() structs.Map { return n.hashQueryRoutingTable } -func (n *NodeImpl) IsStarted() bool { +func (n *Node) IsStarted() bool { return n.started } -func (n *NodeImpl) Config() *config.NodeConfig { +func (n *Node) Config() *config.NodeConfig { return n.nodeConfig } -func (n *NodeImpl) Logger() *zap.Logger { +func (n *Node) Logger() *zap.Logger { if n.nodeConfig != nil { return n.nodeConfig.Logger } return nil } -func (n *NodeImpl) Db() *bolt.DB { +func (n *Node) Db() *bolt.DB { if n.nodeConfig != nil { return n.nodeConfig.DB } return nil } -func (n *NodeImpl) Start() error { +func (n *Node) Start() error { protocol.Init() signed.Init() err := @@ -109,15 +106,15 @@ func (n *NodeImpl) Start() error { n.started = true return nil } -func (n *NodeImpl) GetCachedStorageLocations(hash *encoding.Multihash, kinds []types.StorageLocationType) (map[string]interfaces.StorageLocation, error) { - locations := make(map[string]interfaces.StorageLocation) +func (n *Node) GetCachedStorageLocations(hash *encoding.Multihash, kinds []types.StorageLocationType) (map[string]storage.StorageLocation, error) { + locations := make(map[string]storage.StorageLocation) locationMap, err := n.readStorageLocationsFromDB(hash) if err != nil { return nil, err } if len(locationMap) == 0 { - return make(map[string]interfaces.StorageLocation), nil + return make(map[string]storage.StorageLocation), nil } ts := time.Now().Unix() @@ -163,7 +160,7 @@ func (n *NodeImpl) GetCachedStorageLocations(hash *encoding.Multihash, kinds []t } return locations, nil } -func (n *NodeImpl) readStorageLocationsFromDB(hash *encoding.Multihash) (storage.StorageLocationMap, error) { +func (n *Node) readStorageLocationsFromDB(hash *encoding.Multihash) (storage.StorageLocationMap, error) { var locationMap storage.StorageLocationMap err := n.Db().View(func(tx *bolt.Tx) error { @@ -189,7 +186,7 @@ func (n *NodeImpl) readStorageLocationsFromDB(hash *encoding.Multihash) (storage return locationMap, nil } -func (n *NodeImpl) AddStorageLocation(hash *encoding.Multihash, nodeId *encoding.NodeId, location interfaces.StorageLocation, message []byte) error { +func (n *Node) AddStorageLocation(hash *encoding.Multihash, nodeId *encoding.NodeId, location storage.StorageLocation, message []byte) error { // Read existing storage locations locationDb, err := n.readStorageLocationsFromDB(hash) if err != nil { @@ -235,7 +232,7 @@ func (n *NodeImpl) AddStorageLocation(hash *encoding.Multihash, nodeId *encoding return nil } -func (n *NodeImpl) DownloadBytesByHash(hash *encoding.Multihash) ([]byte, error) { +func (n *Node) DownloadBytesByHash(hash *encoding.Multihash) ([]byte, error) { // Initialize the download URI provider dlUriProvider := storage.NewStorageLocationProvider(n, hash, types.StorageLocationTypeFull, types.StorageLocationTypeFile) err := dlUriProvider.Start() @@ -271,7 +268,7 @@ func (n *NodeImpl) DownloadBytesByHash(hash *encoding.Multihash) ([]byte, error) } } -func (n *NodeImpl) DownloadBytesByCID(cid *encoding.CID) (bytes []byte, err error) { +func (n *Node) DownloadBytesByCID(cid *encoding.CID) (bytes []byte, err error) { bytes, err = n.DownloadBytesByHash(&cid.Hash) if err != nil { return nil, err @@ -280,7 +277,7 @@ func (n *NodeImpl) DownloadBytesByCID(cid *encoding.CID) (bytes []byte, err erro return bytes, nil } -func (n *NodeImpl) GetMetadataByCID(cid *encoding.CID) (md metadata.Metadata, err error) { +func (n *Node) GetMetadataByCID(cid *encoding.CID) (md metadata.Metadata, err error) { hashStr, err := cid.Hash.ToString() if err != nil { return nil, err @@ -327,18 +324,18 @@ func (n *NodeImpl) GetMetadataByCID(cid *encoding.CID) (md metadata.Metadata, er return md, nil } -func (n *NodeImpl) WaitOnConnectedPeers() { +func (n *Node) WaitOnConnectedPeers() { n.connections.Wait() } -func (n *NodeImpl) ConnectionTracker() *sync.WaitGroup { +func (n *Node) ConnectionTracker() *sync.WaitGroup { return &n.connections } -func (n *NodeImpl) SetProviderStore(store interfaces.ProviderStore) { +func (n *Node) SetProviderStore(store storage.ProviderStore) { n.providerStore = store } -func (n *NodeImpl) ProviderStore() interfaces.ProviderStore { +func (n *Node) ProviderStore() storage.ProviderStore { return n.providerStore } diff --git a/node/services.go b/node/services.go index cd525eb..df8eebc 100644 --- a/node/services.go +++ b/node/services.go @@ -1,23 +1,25 @@ package node -import "git.lumeweb.com/LumeWeb/libs5-go/interfaces" +import ( + "git.lumeweb.com/LumeWeb/libs5-go/service" +) var ( - _ interfaces.Services = (*ServicesImpl)(nil) + _ service.Services = (*ServicesImpl)(nil) ) type ServicesImpl struct { - p2p interfaces.P2PService - registry interfaces.RegistryService - http interfaces.HTTPService + p2p *service.P2PService + registry *service.RegistryService + http *service.HTTPService } -func (s *ServicesImpl) HTTP() interfaces.HTTPService { +func (s *ServicesImpl) HTTP() *service.HTTPService { return s.http } -func (s *ServicesImpl) All() []interfaces.Service { - services := make([]interfaces.Service, 0) +func (s *ServicesImpl) All() []service.Service { + services := make([]service.Service, 0) services = append(services, s.p2p) services = append(services, s.registry) services = append(services, s.http) @@ -25,11 +27,11 @@ func (s *ServicesImpl) All() []interfaces.Service { return services } -func (s *ServicesImpl) Registry() interfaces.RegistryService { +func (s *ServicesImpl) Registry() *service.RegistryService { return s.registry } -func NewServices(p2p interfaces.P2PService, registry interfaces.RegistryService, http interfaces.HTTPService) interfaces.Services { +func NewServices(p2p *service.P2PService, registry *service.RegistryService, http *service.HTTPService) service.Services { return &ServicesImpl{ p2p: p2p, registry: registry, @@ -37,6 +39,6 @@ func NewServices(p2p interfaces.P2PService, registry interfaces.RegistryService, } } -func (s *ServicesImpl) P2P() interfaces.P2PService { +func (s *ServicesImpl) P2P() *service.P2PService { return s.p2p } diff --git a/protocol/base/base.go b/protocol/base/base.go index a096b99..7da85d5 100644 --- a/protocol/base/base.go +++ b/protocol/base/base.go @@ -24,7 +24,7 @@ type IncomingMessageData struct { Original []byte Data []byte Ctx context.Context - Node *node.NodeImpl + Node *node.Node Peer net.Peer VerifyId bool } diff --git a/protocol/registry.go b/protocol/registry.go index 524e45f..2e03237 100644 --- a/protocol/registry.go +++ b/protocol/registry.go @@ -4,16 +4,31 @@ import ( ed25519p "crypto/ed25519" "errors" "git.lumeweb.com/LumeWeb/libs5-go/ed25519" - "git.lumeweb.com/LumeWeb/libs5-go/interfaces" "git.lumeweb.com/LumeWeb/libs5-go/types" "git.lumeweb.com/LumeWeb/libs5-go/utils" ) var ( - _ interfaces.SignedRegistryEntry = (*SignedRegistryEntryImpl)(nil) - _ interfaces.SignedRegistryEntry = (*SignedRegistryEntryImpl)(nil) + _ SignedRegistryEntry = (*SignedRegistryEntryImpl)(nil) + _ SignedRegistryEntry = (*SignedRegistryEntryImpl)(nil) ) +type SignedRegistryEntry interface { + PK() []byte + Revision() uint64 + Data() []byte + Signature() []byte + SetPK(pk []byte) + SetRevision(revision uint64) + SetData(data []byte) + SetSignature(signature []byte) + Verify() bool +} + +type RegistryEntry interface { + Sign() SignedRegistryEntry +} + type SignedRegistryEntryImpl struct { pk []byte revision uint64 @@ -57,7 +72,7 @@ func (s *SignedRegistryEntryImpl) SetSignature(signature []byte) { s.signature = signature } -func NewSignedRegistryEntry(pk []byte, revision uint64, data []byte, signature []byte) interfaces.SignedRegistryEntry { +func NewSignedRegistryEntry(pk []byte, revision uint64, data []byte, signature []byte) SignedRegistryEntry { return &SignedRegistryEntryImpl{ pk: pk, revision: revision, @@ -72,7 +87,7 @@ type RegistryEntryImpl struct { revision uint64 } -func NewRegistryEntry(kp ed25519.KeyPairEd25519, data []byte, revision uint64) interfaces.RegistryEntry { +func NewRegistryEntry(kp ed25519.KeyPairEd25519, data []byte, revision uint64) RegistryEntry { return &RegistryEntryImpl{ kp: kp, data: data, @@ -80,11 +95,11 @@ func NewRegistryEntry(kp ed25519.KeyPairEd25519, data []byte, revision uint64) i } } -func (r *RegistryEntryImpl) Sign() interfaces.SignedRegistryEntry { +func (r *RegistryEntryImpl) Sign() SignedRegistryEntry { return SignRegistryEntry(r.kp, r.data, r.revision) } -func SignRegistryEntry(kp ed25519.KeyPairEd25519, data []byte, revision uint64) interfaces.SignedRegistryEntry { +func SignRegistryEntry(kp ed25519.KeyPairEd25519, data []byte, revision uint64) SignedRegistryEntry { buffer := MarshalRegistryEntry(data, revision) privateKey := kp.ExtractBytes() @@ -92,14 +107,14 @@ func SignRegistryEntry(kp ed25519.KeyPairEd25519, data []byte, revision uint64) return NewSignedRegistryEntry(kp.PublicKey(), uint64(revision), data, signature) } -func VerifyRegistryEntry(sre interfaces.SignedRegistryEntry) bool { +func VerifyRegistryEntry(sre SignedRegistryEntry) bool { buffer := MarshalRegistryEntry(sre.Data(), sre.Revision()) publicKey := sre.PK()[1:] return ed25519p.Verify(publicKey, buffer, sre.Signature()) } -func MarshalSignedRegistryEntry(sre interfaces.SignedRegistryEntry) []byte { +func MarshalSignedRegistryEntry(sre SignedRegistryEntry) []byte { buffer := MarshalRegistryEntry(sre.Data(), sre.Revision()) buffer = append(buffer, sre.Signature()...) @@ -118,7 +133,7 @@ func MarshalRegistryEntry(data []byte, revision uint64) []byte { return buffer } -func UnmarshalSignedRegistryEntry(event []byte) (sre interfaces.SignedRegistryEntry, err error) { +func UnmarshalSignedRegistryEntry(event []byte) (sre SignedRegistryEntry, err error) { if len(event) < 43 { return nil, errors.New("Invalid registry entry") } diff --git a/protocol/registry_entry.go b/protocol/registry_entry.go index e9912aa..5577f6f 100644 --- a/protocol/registry_entry.go +++ b/protocol/registry_entry.go @@ -1,7 +1,6 @@ package protocol import ( - "git.lumeweb.com/LumeWeb/libs5-go/interfaces" "git.lumeweb.com/LumeWeb/libs5-go/protocol/base" "git.lumeweb.com/LumeWeb/libs5-go/types" "github.com/vmihailenco/msgpack/v5" @@ -11,7 +10,7 @@ var _ base.IncomingMessage = (*RegistryEntryRequest)(nil) var _ base.EncodeableMessage = (*RegistryEntryRequest)(nil) type RegistryEntryRequest struct { - sre interfaces.SignedRegistryEntry + sre SignedRegistryEntry base.HandshakeRequirement } @@ -22,7 +21,7 @@ func NewEmptyRegistryEntryRequest() *RegistryEntryRequest { return rer } -func NewRegistryEntryRequest(sre interfaces.SignedRegistryEntry) *RegistryEntryRequest { +func NewRegistryEntryRequest(sre SignedRegistryEntry) *RegistryEntryRequest { return &RegistryEntryRequest{sre: sre} } diff --git a/service/http.go b/service/http.go index fa3c270..14aed62 100644 --- a/service/http.go +++ b/service/http.go @@ -2,8 +2,8 @@ package service import ( "git.lumeweb.com/LumeWeb/libs5-go/build" - "git.lumeweb.com/LumeWeb/libs5-go/interfaces" "git.lumeweb.com/LumeWeb/libs5-go/net" + _node "git.lumeweb.com/LumeWeb/libs5-go/node" "github.com/julienschmidt/httprouter" "go.sia.tech/jape" "go.uber.org/zap" @@ -11,7 +11,7 @@ import ( "nhooyr.io/websocket" ) -var _ interfaces.HTTPService = (*HTTPImpl)(nil) +var _ Service = (*HTTPService)(nil) type P2PNodesResponse struct { Nodes []P2PNodeResponse `json:"nodes"` @@ -22,17 +22,17 @@ type P2PNodeResponse struct { Uris []string `json:"uris"` } -type HTTPImpl struct { - node interfaces.Node +type HTTPService struct { + node *_node.Node } -func NewHTTP(node interfaces.Node) interfaces.HTTPService { - return &HTTPImpl{ +func NewHTTP(node *_node.Node) *HTTPService { + return &HTTPService{ node: node, } } -func (h *HTTPImpl) GetHttpRouter(inject map[string]jape.Handler) *httprouter.Router { +func (h *HTTPService) GetHttpRouter(inject map[string]jape.Handler) *httprouter.Router { routes := map[string]jape.Handler{ "GET /s5/version": h.versionHandler, "GET /s5/p2p": h.p2pHandler, @@ -46,26 +46,26 @@ func (h *HTTPImpl) GetHttpRouter(inject map[string]jape.Handler) *httprouter.Rou return jape.Mux(routes) } -func (h *HTTPImpl) Node() interfaces.Node { +func (h *HTTPService) Node() *_node.Node { return h.node } -func (h *HTTPImpl) Start() error { +func (h *HTTPService) Start() error { return nil } -func (h *HTTPImpl) Stop() error { +func (h *HTTPService) Stop() error { return nil } -func (h *HTTPImpl) Init() error { +func (h *HTTPService) Init() error { return nil } -func (h *HTTPImpl) versionHandler(ctx jape.Context) { +func (h *HTTPService) versionHandler(ctx jape.Context) { _, _ = ctx.ResponseWriter.Write([]byte(build.Version)) } -func (h *HTTPImpl) p2pHandler(ctx jape.Context) { +func (h *HTTPService) p2pHandler(ctx jape.Context) { c, err := websocket.Accept(ctx.ResponseWriter, ctx.Request, nil) if err != nil { h.node.Logger().Error("error accepting websocket connection", zap.Error(err)) @@ -97,7 +97,7 @@ func (h *HTTPImpl) p2pHandler(ctx jape.Context) { }() } -func (h *HTTPImpl) p2pNodesHandler(ctx jape.Context) { +func (h *HTTPService) p2pNodesHandler(ctx jape.Context) { localId, err := h.node.Services().P2P().NodeId().ToString() if ctx.Check("error getting local node id", err) != nil { diff --git a/service/p2p.go b/service/p2p.go index 9ce07d0..20244d1 100644 --- a/service/p2p.go +++ b/service/p2p.go @@ -8,12 +8,12 @@ import ( "fmt" "git.lumeweb.com/LumeWeb/libs5-go/ed25519" "git.lumeweb.com/LumeWeb/libs5-go/encoding" - "git.lumeweb.com/LumeWeb/libs5-go/interfaces" "git.lumeweb.com/LumeWeb/libs5-go/net" - "git.lumeweb.com/LumeWeb/libs5-go/node" + _node "git.lumeweb.com/LumeWeb/libs5-go/node" "git.lumeweb.com/LumeWeb/libs5-go/protocol" "git.lumeweb.com/LumeWeb/libs5-go/protocol/base" "git.lumeweb.com/LumeWeb/libs5-go/protocol/signed" + "git.lumeweb.com/LumeWeb/libs5-go/storage" "git.lumeweb.com/LumeWeb/libs5-go/structs" "git.lumeweb.com/LumeWeb/libs5-go/types" "git.lumeweb.com/LumeWeb/libs5-go/utils" @@ -26,8 +26,7 @@ import ( "time" ) -var _ interfaces.P2PService = (*P2PImpl)(nil) -var _ interfaces.NodeVotes = (*NodeVotesImpl)(nil) +var _ Service = (*P2PService)(nil) var ( errUnsupportedProtocol = errors.New("unsupported protocol") @@ -36,13 +35,13 @@ var ( const nodeBucketName = "nodes" -type P2PImpl struct { +type P2PService struct { logger *zap.Logger nodeKeyPair *ed25519.KeyPairEd25519 localNodeID *encoding.NodeId networkID string nodesBucket *bolt.Bucket - node interfaces.Node + node *_node.Node inited bool reconnectDelay structs.Map peers structs.Map @@ -55,13 +54,13 @@ type P2PImpl struct { maxOutgoingPeerFailures uint } -func NewP2P(node interfaces.Node) *P2PImpl { +func NewP2P(node *_node.Node) *P2PService { uri, err := url.Parse(fmt.Sprintf("wss://%s:%d/s5/p2p", node.Config().HTTP.API.Domain, node.Config().HTTP.API.Port)) if err != nil { node.Logger().Fatal("failed to HTTP API URL Config", zap.Error(err)) } - service := &P2PImpl{ + service := &P2PService{ logger: node.Logger(), nodeKeyPair: node.Config().KeyPair, networkID: node.Config().P2P.Network, @@ -81,19 +80,19 @@ func NewP2P(node interfaces.Node) *P2PImpl { return service } -func (p *P2PImpl) SelfConnectionUris() []*url.URL { +func (p *P2PService) SelfConnectionUris() []*url.URL { return p.selfConnectionUris } -func (p *P2PImpl) Node() interfaces.Node { +func (p *P2PService) Node() *_node.Node { return p.node } -func (p *P2PImpl) Peers() structs.Map { +func (p *P2PService) Peers() structs.Map { return p.peers } -func (p *P2PImpl) Start() error { +func (p *P2PService) Start() error { config := p.Node().Config() if len(config.P2P.Peers.Initial) > 0 { initialPeers := config.P2P.Peers.Initial @@ -117,11 +116,11 @@ func (p *P2PImpl) Start() error { return nil } -func (p *P2PImpl) Stop() error { +func (p *P2PService) Stop() error { panic("implement me") } -func (p *P2PImpl) Init() error { +func (p *P2PService) Init() error { if p.inited { return nil } @@ -137,7 +136,7 @@ func (p *P2PImpl) Init() error { return nil } -func (p *P2PImpl) ConnectToNode(connectionUris []*url.URL, retried bool, fromPeer net.Peer) error { +func (p *P2PService) ConnectToNode(connectionUris []*url.URL, retried bool, fromPeer net.Peer) error { if !p.Node().IsStarted() { return nil } @@ -343,7 +342,7 @@ func (p *P2PImpl) ConnectToNode(connectionUris []*url.URL, retried bool, fromPee } -func (p *P2PImpl) OnNewPeer(peer net.Peer, verifyId bool) error { +func (p *P2PService) OnNewPeer(peer net.Peer, verifyId bool) error { var wg sync.WaitGroup var pid string @@ -400,7 +399,7 @@ func (p *P2PImpl) OnNewPeer(peer net.Peer, verifyId bool) error { p.logger.Debug("OnNewPeer ended", zap.String("peer", pid)) return nil } -func (p *P2PImpl) OnNewPeerListen(peer net.Peer, verifyId bool) { +func (p *P2PService) OnNewPeerListen(peer net.Peer, verifyId bool) { onDone := net.CloseCallback(func() { if peer.Id() != nil { pid, err := peer.Id().ToString() @@ -444,7 +443,7 @@ func (p *P2PImpl) OnNewPeerListen(peer net.Peer, verifyId bool) { Original: message, Data: reader.Data, Ctx: context.Background(), - Node: p.node.(*node.NodeImpl), + Node: p.node, Peer: peer, VerifyId: verifyId, } @@ -471,7 +470,7 @@ func (p *P2PImpl) OnNewPeerListen(peer net.Peer, verifyId bool) { }) } -func (p *P2PImpl) readNodeVotes(nodeId *encoding.NodeId) (interfaces.NodeVotes, error) { +func (p *P2PService) readNodeVotes(nodeId *encoding.NodeId) (NodeVotes, error) { var value []byte var found bool err := p.node.Db().View(func(tx *bolt.Tx) error { @@ -493,7 +492,7 @@ func (p *P2PImpl) readNodeVotes(nodeId *encoding.NodeId) (interfaces.NodeVotes, return NewNodeVotes(), nil } - var score interfaces.NodeVotes + var score NodeVotes err = msgpack.Unmarshal(value, &score) if err != nil { return nil, err @@ -502,7 +501,7 @@ func (p *P2PImpl) readNodeVotes(nodeId *encoding.NodeId) (interfaces.NodeVotes, return score, nil } -func (p *P2PImpl) saveNodeVotes(nodeId *encoding.NodeId, votes interfaces.NodeVotes) error { +func (p *P2PService) saveNodeVotes(nodeId *encoding.NodeId, votes NodeVotes) error { // Marshal the votes into data data, err := msgpack.Marshal(votes) if err != nil { @@ -521,7 +520,7 @@ func (p *P2PImpl) saveNodeVotes(nodeId *encoding.NodeId, votes interfaces.NodeVo return err } -func (p *P2PImpl) GetNodeScore(nodeId *encoding.NodeId) (float64, error) { +func (p *P2PService) GetNodeScore(nodeId *encoding.NodeId) (float64, error) { if nodeId.Equals(p.localNodeID) { return 1, nil } @@ -534,7 +533,7 @@ func (p *P2PImpl) GetNodeScore(nodeId *encoding.NodeId) (float64, error) { return protocol.CalculateNodeScore(score.Good(), score.Bad()), nil } -func (p *P2PImpl) SortNodesByScore(nodes []*encoding.NodeId) ([]*encoding.NodeId, error) { +func (p *P2PService) SortNodesByScore(nodes []*encoding.NodeId) ([]*encoding.NodeId, error) { scores := make(map[encoding.NodeIdCode]float64) var errOccurred error @@ -554,7 +553,7 @@ func (p *P2PImpl) SortNodesByScore(nodes []*encoding.NodeId) ([]*encoding.NodeId return nodes, errOccurred } -func (p *P2PImpl) SignMessageSimple(message []byte) ([]byte, error) { +func (p *P2PService) SignMessageSimple(message []byte) ([]byte, error) { signedMessage := signed.NewSignedMessageRequest(message) signedMessage.SetNodeId(p.localNodeID) @@ -573,7 +572,7 @@ func (p *P2PImpl) SignMessageSimple(message []byte) ([]byte, error) { return result, nil } -func (p *P2PImpl) AddPeer(peer net.Peer) error { +func (p *P2PService) AddPeer(peer net.Peer) error { peerId, err := peer.Id().ToString() if err != nil { return err @@ -587,7 +586,7 @@ func (p *P2PImpl) AddPeer(peer net.Peer) error { return nil } -func (p *P2PImpl) SendPublicPeersToPeer(peer net.Peer, peersToSend []net.Peer) error { +func (p *P2PService) SendPublicPeersToPeer(peer net.Peer, peersToSend []net.Peer) error { announceRequest := signed.NewAnnounceRequest(peer, peersToSend) message, err := msgpack.Marshal(announceRequest) @@ -606,7 +605,7 @@ func (p *P2PImpl) SendPublicPeersToPeer(peer net.Peer, peersToSend []net.Peer) e return nil } -func (p *P2PImpl) SendHashRequest(hash *encoding.Multihash, kinds []types.StorageLocationType) error { +func (p *P2PService) SendHashRequest(hash *encoding.Multihash, kinds []types.StorageLocationType) error { hashRequest := protocol.NewHashRequest(hash, kinds) message, err := msgpack.Marshal(hashRequest) if err != nil { @@ -625,7 +624,7 @@ func (p *P2PImpl) SendHashRequest(hash *encoding.Multihash, kinds []types.Storag return nil } -func (p *P2PImpl) UpVote(nodeId *encoding.NodeId) error { +func (p *P2PService) UpVote(nodeId *encoding.NodeId) error { err := p.vote(nodeId, true) if err != nil { return err @@ -634,7 +633,7 @@ func (p *P2PImpl) UpVote(nodeId *encoding.NodeId) error { return nil } -func (p *P2PImpl) DownVote(nodeId *encoding.NodeId) error { +func (p *P2PService) DownVote(nodeId *encoding.NodeId) error { err := p.vote(nodeId, false) if err != nil { return err @@ -643,7 +642,7 @@ func (p *P2PImpl) DownVote(nodeId *encoding.NodeId) error { return nil } -func (p *P2PImpl) vote(nodeId *encoding.NodeId, upvote bool) error { +func (p *P2PService) vote(nodeId *encoding.NodeId, upvote bool) error { votes, err := p.readNodeVotes(nodeId) if err != nil { return err @@ -662,11 +661,11 @@ func (p *P2PImpl) vote(nodeId *encoding.NodeId, upvote bool) error { return nil } -func (p *P2PImpl) NodeId() *encoding.NodeId { +func (p *P2PService) NodeId() *encoding.NodeId { return p.localNodeID } -func (p *P2PImpl) PrepareProvideMessage(hash *encoding.Multihash, location interfaces.StorageLocation) []byte { +func (p *P2PService) PrepareProvideMessage(hash *encoding.Multihash, location storage.StorageLocation) []byte { // Initialize the list with the record type. list := []byte{byte(types.RecordTypeStorageLocation)} diff --git a/service/registry.go b/service/registry.go index 5b0050c..7d7727f 100644 --- a/service/registry.go +++ b/service/registry.go @@ -3,8 +3,8 @@ package service import ( "errors" "git.lumeweb.com/LumeWeb/libs5-go/encoding" - "git.lumeweb.com/LumeWeb/libs5-go/interfaces" "git.lumeweb.com/LumeWeb/libs5-go/net" + _node "git.lumeweb.com/LumeWeb/libs5-go/node" "git.lumeweb.com/LumeWeb/libs5-go/protocol" "git.lumeweb.com/LumeWeb/libs5-go/structs" "git.lumeweb.com/LumeWeb/libs5-go/types" @@ -16,42 +16,44 @@ import ( "time" ) -var _ interfaces.RegistryService = (*RegistryImpl)(nil) - const registryBucketName = "registry" -type RegistryImpl struct { - node interfaces.Node +var ( + _ Service = (*RegistryService)(nil) +) + +type RegistryService struct { + node *_node.Node logger *zap.Logger streams structs.Map subs structs.Map } -func (r *RegistryImpl) Node() interfaces.Node { +func (r *RegistryService) Node() *_node.Node { return r.node } -func (r *RegistryImpl) Start() error { +func (r *RegistryService) Start() error { return nil } -func (r *RegistryImpl) Stop() error { +func (r *RegistryService) Stop() error { return nil } -func (r *RegistryImpl) Init() error { +func (r *RegistryService) Init() error { return utils.CreateBucket(registryBucketName, r.node.Db()) } -func NewRegistry(node interfaces.Node) *RegistryImpl { - return &RegistryImpl{ +func NewRegistry(node *_node.Node) *RegistryService { + return &RegistryService{ node: node, logger: node.Logger(), streams: structs.NewMap(), subs: structs.NewMap(), } } -func (r *RegistryImpl) Set(sre interfaces.SignedRegistryEntry, trusted bool, receivedFrom net.Peer) error { +func (r *RegistryService) Set(sre protocol.SignedRegistryEntry, trusted bool, receivedFrom net.Peer) error { hash := encoding.NewMultihash(sre.PK()) hashString, err := hash.ToString() if err != nil { @@ -138,7 +140,7 @@ func (r *RegistryImpl) Set(sre interfaces.SignedRegistryEntry, trusted bool, rec return nil } -func (r *RegistryImpl) BroadcastEntry(sre interfaces.SignedRegistryEntry, receivedFrom net.Peer) error { +func (r *RegistryService) BroadcastEntry(sre protocol.SignedRegistryEntry, receivedFrom net.Peer) error { hash := encoding.NewMultihash(sre.PK()) hashString, err := hash.ToString() if err != nil { @@ -171,7 +173,7 @@ func (r *RegistryImpl) BroadcastEntry(sre interfaces.SignedRegistryEntry, receiv return nil } -func (r *RegistryImpl) SendRegistryRequest(pk []byte) error { +func (r *RegistryService) SendRegistryRequest(pk []byte) error { query := protocol.NewRegistryQuery(pk) request, err := msgpack.Marshal(query) @@ -198,7 +200,7 @@ func (r *RegistryImpl) SendRegistryRequest(pk []byte) error { return nil } -func (r *RegistryImpl) Get(pk []byte) (interfaces.SignedRegistryEntry, error) { +func (r *RegistryService) Get(pk []byte) (protocol.SignedRegistryEntry, error) { key := encoding.NewMultihash(pk) keyString, err := key.ToString() if err != nil { @@ -259,14 +261,14 @@ func (r *RegistryImpl) Get(pk []byte) (interfaces.SignedRegistryEntry, error) { return nil, nil } -func (r *RegistryImpl) Listen(pk []byte, cb func(sre interfaces.SignedRegistryEntry)) (func(), error) { +func (r *RegistryService) Listen(pk []byte, cb func(sre protocol.SignedRegistryEntry)) (func(), error) { key, err := encoding.NewMultihash(pk).ToString() if err != nil { return nil, err } cbProxy := func(event *emitter.Event) { - sre, ok := event.Args[0].(interfaces.SignedRegistryEntry) + sre, ok := event.Args[0].(protocol.SignedRegistryEntry) if !ok { r.logger.Error("Failed to cast event to SignedRegistryEntry") return @@ -292,7 +294,7 @@ func (r *RegistryImpl) Listen(pk []byte, cb func(sre interfaces.SignedRegistryEn }, nil } -func (r *RegistryImpl) getFromDB(pk []byte) (sre interfaces.SignedRegistryEntry, err error) { +func (r *RegistryService) getFromDB(pk []byte) (sre protocol.SignedRegistryEntry, err error) { err = r.node.Db().View(func(txn *bbolt.Tx) error { bucket := txn.Bucket([]byte(registryBucketName)) val := bucket.Get(pk) diff --git a/service/service.go b/service/service.go new file mode 100644 index 0000000..2781539 --- /dev/null +++ b/service/service.go @@ -0,0 +1,16 @@ +package service + +import "git.lumeweb.com/LumeWeb/libs5-go/node" + +type Service interface { + Node() *node.Node + Start() error + Stop() error + Init() error +} +type Services interface { + P2P() *P2PService + Registry() *RegistryService + HTTP() *HTTPService + All() []Service +} diff --git a/service/vote.go b/service/vote.go index a1f3a31..e205c08 100644 --- a/service/vote.go +++ b/service/vote.go @@ -1,12 +1,11 @@ package service import ( - "git.lumeweb.com/LumeWeb/libs5-go/interfaces" "github.com/vmihailenco/msgpack/v5" ) var ( - _ interfaces.NodeVotes = (*NodeVotesImpl)(nil) + _ NodeVotes = (*NodeVotesImpl)(nil) _ msgpack.CustomDecoder = (*NodeVotesImpl)(nil) _ msgpack.CustomEncoder = (*NodeVotesImpl)(nil) ) @@ -16,7 +15,7 @@ type NodeVotesImpl struct { bad int } -func NewNodeVotes() interfaces.NodeVotes { +func NewNodeVotes() NodeVotes { return &NodeVotesImpl{ good: 0, bad: 0, @@ -69,3 +68,12 @@ func (n *NodeVotesImpl) Upvote() { func (n *NodeVotesImpl) Downvote() { n.bad++ } + +type NodeVotes interface { + msgpack.CustomEncoder + msgpack.CustomDecoder + Good() int + Bad() int + Upvote() + Downvote() +} diff --git a/storage/storage.go b/storage/storage.go index c31d8db..9b0f78d 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -14,11 +14,11 @@ import ( ) var ( - _ msgpack.CustomDecoder = (*StorageLocationMap)(nil) - _ msgpack.CustomEncoder = (*StorageLocationMap)(nil) - _ interfaces.StorageLocation = (*StorageLocationImpl)(nil) - _ interfaces.StorageLocationProvider = (*StorageLocationProviderImpl)(nil) - _ interfaces.SignedStorageLocation = (*SignedStorageLocationImpl)(nil) + _ msgpack.CustomDecoder = (*StorageLocationMap)(nil) + _ msgpack.CustomEncoder = (*StorageLocationMap)(nil) + _ StorageLocation = (*StorageLocationImpl)(nil) + _ StorageLocationProvider = (*StorageLocationProviderImpl)(nil) + _ SignedStorageLocation = (*SignedStorageLocationImpl)(nil) ) type StorageLocationMap map[int]NodeStorage @@ -73,7 +73,7 @@ func (s *StorageLocationImpl) ProviderMessage() []byte { return s.providerMessage } -func NewStorageLocation(Type int, Parts []string, Expiry int64) interfaces.StorageLocation { +func NewStorageLocation(Type int, Parts []string, Expiry int64) StorageLocation { return &StorageLocationImpl{ kind: Type, parts: Parts, @@ -99,10 +99,10 @@ func (s *StorageLocationImpl) String() string { type SignedStorageLocationImpl struct { nodeID *encoding.NodeId - location interfaces.StorageLocation + location StorageLocation } -func NewSignedStorageLocation(NodeID *encoding.NodeId, Location interfaces.StorageLocation) interfaces.SignedStorageLocation { +func NewSignedStorageLocation(NodeID *encoding.NodeId, Location StorageLocation) SignedStorageLocation { return &SignedStorageLocationImpl{ nodeID: NodeID, location: Location, @@ -122,7 +122,7 @@ func (ssl *SignedStorageLocationImpl) String() string { func (ssl *SignedStorageLocationImpl) NodeId() *encoding.NodeId { return ssl.nodeID } -func (ssl *SignedStorageLocationImpl) Location() interfaces.StorageLocation { +func (ssl *SignedStorageLocationImpl) Location() StorageLocation { return ssl.location } @@ -189,7 +189,7 @@ type StorageLocationProviderImpl struct { types []types.StorageLocationType timeoutDuration time.Duration availableNodes []*encoding.NodeId - uris map[string]interfaces.StorageLocation + uris map[string]StorageLocation timeout time.Time isTimedOut bool isWaitingForUri bool @@ -283,7 +283,7 @@ func (s *StorageLocationProviderImpl) Start() error { }() return nil } -func (s *StorageLocationProviderImpl) Next() (interfaces.SignedStorageLocation, error) { +func (s *StorageLocationProviderImpl) Next() (SignedStorageLocation, error) { s.timeout = time.Now().Add(s.timeoutDuration) for { @@ -319,7 +319,7 @@ func (s *StorageLocationProviderImpl) Next() (interfaces.SignedStorageLocation, } } -func (s *StorageLocationProviderImpl) Upvote(uri interfaces.SignedStorageLocation) error { +func (s *StorageLocationProviderImpl) Upvote(uri SignedStorageLocation) error { err := s.node.Services().P2P().UpVote(uri.NodeId()) if err != nil { return err @@ -328,7 +328,7 @@ func (s *StorageLocationProviderImpl) Upvote(uri interfaces.SignedStorageLocatio return nil } -func (s *StorageLocationProviderImpl) Downvote(uri interfaces.SignedStorageLocation) error { +func (s *StorageLocationProviderImpl) Downvote(uri SignedStorageLocation) error { err := s.node.Services().P2P().DownVote(uri.NodeId()) if err != nil { return err @@ -336,7 +336,7 @@ func (s *StorageLocationProviderImpl) Downvote(uri interfaces.SignedStorageLocat return nil } -func NewStorageLocationProvider(node interfaces.Node, hash *encoding.Multihash, locationTypes ...types.StorageLocationType) interfaces.StorageLocationProvider { +func NewStorageLocationProvider(node interfaces.Node, hash *encoding.Multihash, locationTypes ...types.StorageLocationType) StorageLocationProvider { if locationTypes == nil { locationTypes = []types.StorageLocationType{ types.StorageLocationTypeFull, @@ -348,7 +348,7 @@ func NewStorageLocationProvider(node interfaces.Node, hash *encoding.Multihash, hash: hash, types: locationTypes, timeoutDuration: 60 * time.Second, - uris: make(map[string]interfaces.StorageLocation), + uris: make(map[string]StorageLocation), } } func containsNode(slice []*encoding.NodeId, item *encoding.NodeId) bool { @@ -359,3 +359,36 @@ func containsNode(slice []*encoding.NodeId, item *encoding.NodeId) bool { } return false } + +type StorageLocationProvider interface { + Start() error + Next() (SignedStorageLocation, error) + Upvote(uri SignedStorageLocation) error + Downvote(uri SignedStorageLocation) error +} + +type StorageLocation interface { + BytesURL() string + OutboardBytesURL() string + String() string + ProviderMessage() []byte + Type() int + Parts() []string + BinaryParts() [][]byte + Expiry() int64 + SetProviderMessage(msg []byte) + SetType(t int) + SetParts(p []string) + SetBinaryParts(bp [][]byte) + SetExpiry(e int64) +} +type SignedStorageLocation interface { + String() string + NodeId() *encoding.NodeId + Location() StorageLocation +} + +type ProviderStore interface { + CanProvide(hash *encoding.Multihash, kind []types.StorageLocationType) bool + Provide(hash *encoding.Multihash, kind []types.StorageLocationType) (StorageLocation, error) +}