refactor: remove dedicated interfaces and minimize interfaces

This commit is contained in:
Derrick Hammer 2024-01-28 23:59:43 -05:00
parent 31ccfb8c0b
commit a0dcc52d63
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
20 changed files with 206 additions and 323 deletions

View File

@ -1,13 +0,0 @@
package interfaces
import (
"github.com/julienschmidt/httprouter"
"go.sia.tech/jape"
)
//go:generate mockgen -source=http.go -destination=../mocks/interfaces/http.go -package=interfaces
type HTTPService interface {
Service
GetHttpRouter(inject map[string]jape.Handler) *httprouter.Router
}

View File

@ -1,7 +0,0 @@
package interfaces
//go:generate mockgen -source=meta.go -destination=../mocks/interfaces/meta.go -package=interfaces
type Metadata interface {
ToJson() map[string]interface{}
}

View File

@ -1,34 +0,0 @@
package interfaces
import (
"git.lumeweb.com/LumeWeb/libs5-go/config"
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
"git.lumeweb.com/LumeWeb/libs5-go/metadata"
"git.lumeweb.com/LumeWeb/libs5-go/structs"
"git.lumeweb.com/LumeWeb/libs5-go/types"
bolt "go.etcd.io/bbolt"
"go.uber.org/zap"
"sync"
)
//go:generate mockgen -source=node.go -destination=../mocks/interfaces/node.go -package=interfaces
type Node interface {
Services() Services
HashQueryRoutingTable() structs.Map
IsStarted() bool
Config() *config.NodeConfig
Logger() *zap.Logger
Db() *bolt.DB
Start() error
GetCachedStorageLocations(hash *encoding.Multihash, kinds []types.StorageLocationType) (map[string]StorageLocation, error)
AddStorageLocation(hash *encoding.Multihash, nodeId *encoding.NodeId, location StorageLocation, message []byte) error
NetworkId() string
DownloadBytesByHash(hash *encoding.Multihash) ([]byte, error)
DownloadBytesByCID(cid *encoding.CID) (bytes []byte, err error)
GetMetadataByCID(cid *encoding.CID) (metadata.Metadata, error)
WaitOnConnectedPeers()
ConnectionTracker() *sync.WaitGroup
SetProviderStore(store ProviderStore)
ProviderStore() ProviderStore
}

View File

@ -1,30 +0,0 @@
package interfaces
import (
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
"git.lumeweb.com/LumeWeb/libs5-go/net"
"git.lumeweb.com/LumeWeb/libs5-go/structs"
"git.lumeweb.com/LumeWeb/libs5-go/types"
"net/url"
)
//go:generate mockgen -source=p2p.go -destination=../mocks/interfaces/p2p.go -package=interfaces
type P2PService interface {
Peers() structs.Map
ConnectToNode(connectionUris []*url.URL, retried bool, fromPeer net.Peer) error
OnNewPeer(peer net.Peer, verifyId bool) error
OnNewPeerListen(peer net.Peer, verifyId bool)
GetNodeScore(nodeId *encoding.NodeId) (float64, error)
SortNodesByScore(nodes []*encoding.NodeId) ([]*encoding.NodeId, error)
SignMessageSimple(message []byte) ([]byte, error)
AddPeer(peer net.Peer) error
SendPublicPeersToPeer(peer net.Peer, peersToSend []net.Peer) error
SendHashRequest(hash *encoding.Multihash, kinds []types.StorageLocationType) error
UpVote(nodeId *encoding.NodeId) error
DownVote(nodeId *encoding.NodeId) error
NodeId() *encoding.NodeId
SelfConnectionUris() []*url.URL
PrepareProvideMessage(hash *encoding.Multihash, location StorageLocation) []byte
Service
}

View File

@ -1,11 +0,0 @@
package interfaces
import (
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
"git.lumeweb.com/LumeWeb/libs5-go/types"
)
type ProviderStore interface {
CanProvide(hash *encoding.Multihash, kind []types.StorageLocationType) bool
Provide(hash *encoding.Multihash, kind []types.StorageLocationType) (StorageLocation, error)
}

View File

@ -1,30 +0,0 @@
package interfaces
import "git.lumeweb.com/LumeWeb/libs5-go/net"
//go:generate mockgen -source=registry.go -destination=../mocks/interfaces/registry.go -package=interfaces
type SignedRegistryEntry interface {
PK() []byte
Revision() uint64
Data() []byte
Signature() []byte
SetPK(pk []byte)
SetRevision(revision uint64)
SetData(data []byte)
SetSignature(signature []byte)
Verify() bool
}
type RegistryEntry interface {
Sign() SignedRegistryEntry
}
type RegistryService interface {
Set(sre SignedRegistryEntry, trusted bool, receivedFrom net.Peer) error
Get(pk []byte) (SignedRegistryEntry, error)
BroadcastEntry(sre SignedRegistryEntry, receivedFrom net.Peer) error
SendRegistryRequest(pk []byte) error
Listen(pk []byte, cb func(sre SignedRegistryEntry)) (func(), error)
Service
}

View File

@ -1,16 +0,0 @@
package interfaces
//go:generate mockgen -source=service.go -destination=../mocks/interfaces/service.go -package=interfaces
type Service interface {
Node() Node
Start() error
Stop() error
Init() error
}
type Services interface {
P2P() P2PService
Registry() RegistryService
HTTP() HTTPService
All() []Service
}

View File

@ -1,33 +0,0 @@
package interfaces
import "git.lumeweb.com/LumeWeb/libs5-go/encoding"
//go:generate mockgen -source=storage.go -destination=../mocks/interfaces/storage.go -package=interfaces
type StorageLocationProvider interface {
Start() error
Next() (SignedStorageLocation, error)
Upvote(uri SignedStorageLocation) error
Downvote(uri SignedStorageLocation) error
}
type StorageLocation interface {
BytesURL() string
OutboardBytesURL() string
String() string
ProviderMessage() []byte
Type() int
Parts() []string
BinaryParts() [][]byte
Expiry() int64
SetProviderMessage(msg []byte)
SetType(t int)
SetParts(p []string)
SetBinaryParts(bp [][]byte)
SetExpiry(e int64)
}
type SignedStorageLocation interface {
String() string
NodeId() *encoding.NodeId
Location() StorageLocation
}

View File

@ -1,14 +0,0 @@
package interfaces
import "github.com/vmihailenco/msgpack/v5"
//go:generate mockgen -source=vote.go -destination=../mocks/interfaces/vote.go -package=interfaces
type NodeVotes interface {
msgpack.CustomEncoder
msgpack.CustomDecoder
Good() int
Bad() int
Upvote()
Downvote()
}

View File

@ -5,7 +5,6 @@ import (
"fmt"
"git.lumeweb.com/LumeWeb/libs5-go/config"
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
"git.lumeweb.com/LumeWeb/libs5-go/interfaces"
"git.lumeweb.com/LumeWeb/libs5-go/metadata"
"git.lumeweb.com/LumeWeb/libs5-go/protocol"
"git.lumeweb.com/LumeWeb/libs5-go/protocol/signed"
@ -22,31 +21,29 @@ import (
"time"
)
var _ interfaces.Node = (*NodeImpl)(nil)
const cacheBucketName = "object-cache"
type NodeImpl struct {
type Node struct {
nodeConfig *config.NodeConfig
metadataCache structs.Map
started bool
hashQueryRoutingTable structs.Map
services interfaces.Services
services service.Services
httpClient *resty.Client
connections sync.WaitGroup
providerStore interfaces.ProviderStore
providerStore storage.ProviderStore
}
func (n *NodeImpl) NetworkId() string {
func (n *Node) NetworkId() string {
return n.nodeConfig.P2P.Network
}
func (n *NodeImpl) Services() interfaces.Services {
func (n *Node) Services() service.Services {
return n.services
}
func NewNode(config *config.NodeConfig) interfaces.Node {
n := &NodeImpl{
func NewNode(config *config.NodeConfig) *Node {
n := &Node{
nodeConfig: config,
metadataCache: structs.NewMap(),
started: false,
@ -57,33 +54,33 @@ func NewNode(config *config.NodeConfig) interfaces.Node {
return n
}
func (n *NodeImpl) HashQueryRoutingTable() structs.Map {
func (n *Node) HashQueryRoutingTable() structs.Map {
return n.hashQueryRoutingTable
}
func (n *NodeImpl) IsStarted() bool {
func (n *Node) IsStarted() bool {
return n.started
}
func (n *NodeImpl) Config() *config.NodeConfig {
func (n *Node) Config() *config.NodeConfig {
return n.nodeConfig
}
func (n *NodeImpl) Logger() *zap.Logger {
func (n *Node) Logger() *zap.Logger {
if n.nodeConfig != nil {
return n.nodeConfig.Logger
}
return nil
}
func (n *NodeImpl) Db() *bolt.DB {
func (n *Node) Db() *bolt.DB {
if n.nodeConfig != nil {
return n.nodeConfig.DB
}
return nil
}
func (n *NodeImpl) Start() error {
func (n *Node) Start() error {
protocol.Init()
signed.Init()
err :=
@ -109,15 +106,15 @@ func (n *NodeImpl) Start() error {
n.started = true
return nil
}
func (n *NodeImpl) GetCachedStorageLocations(hash *encoding.Multihash, kinds []types.StorageLocationType) (map[string]interfaces.StorageLocation, error) {
locations := make(map[string]interfaces.StorageLocation)
func (n *Node) GetCachedStorageLocations(hash *encoding.Multihash, kinds []types.StorageLocationType) (map[string]storage.StorageLocation, error) {
locations := make(map[string]storage.StorageLocation)
locationMap, err := n.readStorageLocationsFromDB(hash)
if err != nil {
return nil, err
}
if len(locationMap) == 0 {
return make(map[string]interfaces.StorageLocation), nil
return make(map[string]storage.StorageLocation), nil
}
ts := time.Now().Unix()
@ -163,7 +160,7 @@ func (n *NodeImpl) GetCachedStorageLocations(hash *encoding.Multihash, kinds []t
}
return locations, nil
}
func (n *NodeImpl) readStorageLocationsFromDB(hash *encoding.Multihash) (storage.StorageLocationMap, error) {
func (n *Node) readStorageLocationsFromDB(hash *encoding.Multihash) (storage.StorageLocationMap, error) {
var locationMap storage.StorageLocationMap
err := n.Db().View(func(tx *bolt.Tx) error {
@ -189,7 +186,7 @@ func (n *NodeImpl) readStorageLocationsFromDB(hash *encoding.Multihash) (storage
return locationMap, nil
}
func (n *NodeImpl) AddStorageLocation(hash *encoding.Multihash, nodeId *encoding.NodeId, location interfaces.StorageLocation, message []byte) error {
func (n *Node) AddStorageLocation(hash *encoding.Multihash, nodeId *encoding.NodeId, location storage.StorageLocation, message []byte) error {
// Read existing storage locations
locationDb, err := n.readStorageLocationsFromDB(hash)
if err != nil {
@ -235,7 +232,7 @@ func (n *NodeImpl) AddStorageLocation(hash *encoding.Multihash, nodeId *encoding
return nil
}
func (n *NodeImpl) DownloadBytesByHash(hash *encoding.Multihash) ([]byte, error) {
func (n *Node) DownloadBytesByHash(hash *encoding.Multihash) ([]byte, error) {
// Initialize the download URI provider
dlUriProvider := storage.NewStorageLocationProvider(n, hash, types.StorageLocationTypeFull, types.StorageLocationTypeFile)
err := dlUriProvider.Start()
@ -271,7 +268,7 @@ func (n *NodeImpl) DownloadBytesByHash(hash *encoding.Multihash) ([]byte, error)
}
}
func (n *NodeImpl) DownloadBytesByCID(cid *encoding.CID) (bytes []byte, err error) {
func (n *Node) DownloadBytesByCID(cid *encoding.CID) (bytes []byte, err error) {
bytes, err = n.DownloadBytesByHash(&cid.Hash)
if err != nil {
return nil, err
@ -280,7 +277,7 @@ func (n *NodeImpl) DownloadBytesByCID(cid *encoding.CID) (bytes []byte, err erro
return bytes, nil
}
func (n *NodeImpl) GetMetadataByCID(cid *encoding.CID) (md metadata.Metadata, err error) {
func (n *Node) GetMetadataByCID(cid *encoding.CID) (md metadata.Metadata, err error) {
hashStr, err := cid.Hash.ToString()
if err != nil {
return nil, err
@ -327,18 +324,18 @@ func (n *NodeImpl) GetMetadataByCID(cid *encoding.CID) (md metadata.Metadata, er
return md, nil
}
func (n *NodeImpl) WaitOnConnectedPeers() {
func (n *Node) WaitOnConnectedPeers() {
n.connections.Wait()
}
func (n *NodeImpl) ConnectionTracker() *sync.WaitGroup {
func (n *Node) ConnectionTracker() *sync.WaitGroup {
return &n.connections
}
func (n *NodeImpl) SetProviderStore(store interfaces.ProviderStore) {
func (n *Node) SetProviderStore(store storage.ProviderStore) {
n.providerStore = store
}
func (n *NodeImpl) ProviderStore() interfaces.ProviderStore {
func (n *Node) ProviderStore() storage.ProviderStore {
return n.providerStore
}

View File

@ -1,23 +1,25 @@
package node
import "git.lumeweb.com/LumeWeb/libs5-go/interfaces"
import (
"git.lumeweb.com/LumeWeb/libs5-go/service"
)
var (
_ interfaces.Services = (*ServicesImpl)(nil)
_ service.Services = (*ServicesImpl)(nil)
)
type ServicesImpl struct {
p2p interfaces.P2PService
registry interfaces.RegistryService
http interfaces.HTTPService
p2p *service.P2PService
registry *service.RegistryService
http *service.HTTPService
}
func (s *ServicesImpl) HTTP() interfaces.HTTPService {
func (s *ServicesImpl) HTTP() *service.HTTPService {
return s.http
}
func (s *ServicesImpl) All() []interfaces.Service {
services := make([]interfaces.Service, 0)
func (s *ServicesImpl) All() []service.Service {
services := make([]service.Service, 0)
services = append(services, s.p2p)
services = append(services, s.registry)
services = append(services, s.http)
@ -25,11 +27,11 @@ func (s *ServicesImpl) All() []interfaces.Service {
return services
}
func (s *ServicesImpl) Registry() interfaces.RegistryService {
func (s *ServicesImpl) Registry() *service.RegistryService {
return s.registry
}
func NewServices(p2p interfaces.P2PService, registry interfaces.RegistryService, http interfaces.HTTPService) interfaces.Services {
func NewServices(p2p *service.P2PService, registry *service.RegistryService, http *service.HTTPService) service.Services {
return &ServicesImpl{
p2p: p2p,
registry: registry,
@ -37,6 +39,6 @@ func NewServices(p2p interfaces.P2PService, registry interfaces.RegistryService,
}
}
func (s *ServicesImpl) P2P() interfaces.P2PService {
func (s *ServicesImpl) P2P() *service.P2PService {
return s.p2p
}

View File

@ -24,7 +24,7 @@ type IncomingMessageData struct {
Original []byte
Data []byte
Ctx context.Context
Node *node.NodeImpl
Node *node.Node
Peer net.Peer
VerifyId bool
}

View File

@ -4,16 +4,31 @@ import (
ed25519p "crypto/ed25519"
"errors"
"git.lumeweb.com/LumeWeb/libs5-go/ed25519"
"git.lumeweb.com/LumeWeb/libs5-go/interfaces"
"git.lumeweb.com/LumeWeb/libs5-go/types"
"git.lumeweb.com/LumeWeb/libs5-go/utils"
)
var (
_ interfaces.SignedRegistryEntry = (*SignedRegistryEntryImpl)(nil)
_ interfaces.SignedRegistryEntry = (*SignedRegistryEntryImpl)(nil)
_ SignedRegistryEntry = (*SignedRegistryEntryImpl)(nil)
_ SignedRegistryEntry = (*SignedRegistryEntryImpl)(nil)
)
type SignedRegistryEntry interface {
PK() []byte
Revision() uint64
Data() []byte
Signature() []byte
SetPK(pk []byte)
SetRevision(revision uint64)
SetData(data []byte)
SetSignature(signature []byte)
Verify() bool
}
type RegistryEntry interface {
Sign() SignedRegistryEntry
}
type SignedRegistryEntryImpl struct {
pk []byte
revision uint64
@ -57,7 +72,7 @@ func (s *SignedRegistryEntryImpl) SetSignature(signature []byte) {
s.signature = signature
}
func NewSignedRegistryEntry(pk []byte, revision uint64, data []byte, signature []byte) interfaces.SignedRegistryEntry {
func NewSignedRegistryEntry(pk []byte, revision uint64, data []byte, signature []byte) SignedRegistryEntry {
return &SignedRegistryEntryImpl{
pk: pk,
revision: revision,
@ -72,7 +87,7 @@ type RegistryEntryImpl struct {
revision uint64
}
func NewRegistryEntry(kp ed25519.KeyPairEd25519, data []byte, revision uint64) interfaces.RegistryEntry {
func NewRegistryEntry(kp ed25519.KeyPairEd25519, data []byte, revision uint64) RegistryEntry {
return &RegistryEntryImpl{
kp: kp,
data: data,
@ -80,11 +95,11 @@ func NewRegistryEntry(kp ed25519.KeyPairEd25519, data []byte, revision uint64) i
}
}
func (r *RegistryEntryImpl) Sign() interfaces.SignedRegistryEntry {
func (r *RegistryEntryImpl) Sign() SignedRegistryEntry {
return SignRegistryEntry(r.kp, r.data, r.revision)
}
func SignRegistryEntry(kp ed25519.KeyPairEd25519, data []byte, revision uint64) interfaces.SignedRegistryEntry {
func SignRegistryEntry(kp ed25519.KeyPairEd25519, data []byte, revision uint64) SignedRegistryEntry {
buffer := MarshalRegistryEntry(data, revision)
privateKey := kp.ExtractBytes()
@ -92,14 +107,14 @@ func SignRegistryEntry(kp ed25519.KeyPairEd25519, data []byte, revision uint64)
return NewSignedRegistryEntry(kp.PublicKey(), uint64(revision), data, signature)
}
func VerifyRegistryEntry(sre interfaces.SignedRegistryEntry) bool {
func VerifyRegistryEntry(sre SignedRegistryEntry) bool {
buffer := MarshalRegistryEntry(sre.Data(), sre.Revision())
publicKey := sre.PK()[1:]
return ed25519p.Verify(publicKey, buffer, sre.Signature())
}
func MarshalSignedRegistryEntry(sre interfaces.SignedRegistryEntry) []byte {
func MarshalSignedRegistryEntry(sre SignedRegistryEntry) []byte {
buffer := MarshalRegistryEntry(sre.Data(), sre.Revision())
buffer = append(buffer, sre.Signature()...)
@ -118,7 +133,7 @@ func MarshalRegistryEntry(data []byte, revision uint64) []byte {
return buffer
}
func UnmarshalSignedRegistryEntry(event []byte) (sre interfaces.SignedRegistryEntry, err error) {
func UnmarshalSignedRegistryEntry(event []byte) (sre SignedRegistryEntry, err error) {
if len(event) < 43 {
return nil, errors.New("Invalid registry entry")
}

View File

@ -1,7 +1,6 @@
package protocol
import (
"git.lumeweb.com/LumeWeb/libs5-go/interfaces"
"git.lumeweb.com/LumeWeb/libs5-go/protocol/base"
"git.lumeweb.com/LumeWeb/libs5-go/types"
"github.com/vmihailenco/msgpack/v5"
@ -11,7 +10,7 @@ var _ base.IncomingMessage = (*RegistryEntryRequest)(nil)
var _ base.EncodeableMessage = (*RegistryEntryRequest)(nil)
type RegistryEntryRequest struct {
sre interfaces.SignedRegistryEntry
sre SignedRegistryEntry
base.HandshakeRequirement
}
@ -22,7 +21,7 @@ func NewEmptyRegistryEntryRequest() *RegistryEntryRequest {
return rer
}
func NewRegistryEntryRequest(sre interfaces.SignedRegistryEntry) *RegistryEntryRequest {
func NewRegistryEntryRequest(sre SignedRegistryEntry) *RegistryEntryRequest {
return &RegistryEntryRequest{sre: sre}
}

View File

@ -2,8 +2,8 @@ package service
import (
"git.lumeweb.com/LumeWeb/libs5-go/build"
"git.lumeweb.com/LumeWeb/libs5-go/interfaces"
"git.lumeweb.com/LumeWeb/libs5-go/net"
_node "git.lumeweb.com/LumeWeb/libs5-go/node"
"github.com/julienschmidt/httprouter"
"go.sia.tech/jape"
"go.uber.org/zap"
@ -11,7 +11,7 @@ import (
"nhooyr.io/websocket"
)
var _ interfaces.HTTPService = (*HTTPImpl)(nil)
var _ Service = (*HTTPService)(nil)
type P2PNodesResponse struct {
Nodes []P2PNodeResponse `json:"nodes"`
@ -22,17 +22,17 @@ type P2PNodeResponse struct {
Uris []string `json:"uris"`
}
type HTTPImpl struct {
node interfaces.Node
type HTTPService struct {
node *_node.Node
}
func NewHTTP(node interfaces.Node) interfaces.HTTPService {
return &HTTPImpl{
func NewHTTP(node *_node.Node) *HTTPService {
return &HTTPService{
node: node,
}
}
func (h *HTTPImpl) GetHttpRouter(inject map[string]jape.Handler) *httprouter.Router {
func (h *HTTPService) GetHttpRouter(inject map[string]jape.Handler) *httprouter.Router {
routes := map[string]jape.Handler{
"GET /s5/version": h.versionHandler,
"GET /s5/p2p": h.p2pHandler,
@ -46,26 +46,26 @@ func (h *HTTPImpl) GetHttpRouter(inject map[string]jape.Handler) *httprouter.Rou
return jape.Mux(routes)
}
func (h *HTTPImpl) Node() interfaces.Node {
func (h *HTTPService) Node() *_node.Node {
return h.node
}
func (h *HTTPImpl) Start() error {
func (h *HTTPService) Start() error {
return nil
}
func (h *HTTPImpl) Stop() error {
func (h *HTTPService) Stop() error {
return nil
}
func (h *HTTPImpl) Init() error {
func (h *HTTPService) Init() error {
return nil
}
func (h *HTTPImpl) versionHandler(ctx jape.Context) {
func (h *HTTPService) versionHandler(ctx jape.Context) {
_, _ = ctx.ResponseWriter.Write([]byte(build.Version))
}
func (h *HTTPImpl) p2pHandler(ctx jape.Context) {
func (h *HTTPService) p2pHandler(ctx jape.Context) {
c, err := websocket.Accept(ctx.ResponseWriter, ctx.Request, nil)
if err != nil {
h.node.Logger().Error("error accepting websocket connection", zap.Error(err))
@ -97,7 +97,7 @@ func (h *HTTPImpl) p2pHandler(ctx jape.Context) {
}()
}
func (h *HTTPImpl) p2pNodesHandler(ctx jape.Context) {
func (h *HTTPService) p2pNodesHandler(ctx jape.Context) {
localId, err := h.node.Services().P2P().NodeId().ToString()
if ctx.Check("error getting local node id", err) != nil {

View File

@ -8,12 +8,12 @@ import (
"fmt"
"git.lumeweb.com/LumeWeb/libs5-go/ed25519"
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
"git.lumeweb.com/LumeWeb/libs5-go/interfaces"
"git.lumeweb.com/LumeWeb/libs5-go/net"
"git.lumeweb.com/LumeWeb/libs5-go/node"
_node "git.lumeweb.com/LumeWeb/libs5-go/node"
"git.lumeweb.com/LumeWeb/libs5-go/protocol"
"git.lumeweb.com/LumeWeb/libs5-go/protocol/base"
"git.lumeweb.com/LumeWeb/libs5-go/protocol/signed"
"git.lumeweb.com/LumeWeb/libs5-go/storage"
"git.lumeweb.com/LumeWeb/libs5-go/structs"
"git.lumeweb.com/LumeWeb/libs5-go/types"
"git.lumeweb.com/LumeWeb/libs5-go/utils"
@ -26,8 +26,7 @@ import (
"time"
)
var _ interfaces.P2PService = (*P2PImpl)(nil)
var _ interfaces.NodeVotes = (*NodeVotesImpl)(nil)
var _ Service = (*P2PService)(nil)
var (
errUnsupportedProtocol = errors.New("unsupported protocol")
@ -36,13 +35,13 @@ var (
const nodeBucketName = "nodes"
type P2PImpl struct {
type P2PService struct {
logger *zap.Logger
nodeKeyPair *ed25519.KeyPairEd25519
localNodeID *encoding.NodeId
networkID string
nodesBucket *bolt.Bucket
node interfaces.Node
node *_node.Node
inited bool
reconnectDelay structs.Map
peers structs.Map
@ -55,13 +54,13 @@ type P2PImpl struct {
maxOutgoingPeerFailures uint
}
func NewP2P(node interfaces.Node) *P2PImpl {
func NewP2P(node *_node.Node) *P2PService {
uri, err := url.Parse(fmt.Sprintf("wss://%s:%d/s5/p2p", node.Config().HTTP.API.Domain, node.Config().HTTP.API.Port))
if err != nil {
node.Logger().Fatal("failed to HTTP API URL Config", zap.Error(err))
}
service := &P2PImpl{
service := &P2PService{
logger: node.Logger(),
nodeKeyPair: node.Config().KeyPair,
networkID: node.Config().P2P.Network,
@ -81,19 +80,19 @@ func NewP2P(node interfaces.Node) *P2PImpl {
return service
}
func (p *P2PImpl) SelfConnectionUris() []*url.URL {
func (p *P2PService) SelfConnectionUris() []*url.URL {
return p.selfConnectionUris
}
func (p *P2PImpl) Node() interfaces.Node {
func (p *P2PService) Node() *_node.Node {
return p.node
}
func (p *P2PImpl) Peers() structs.Map {
func (p *P2PService) Peers() structs.Map {
return p.peers
}
func (p *P2PImpl) Start() error {
func (p *P2PService) Start() error {
config := p.Node().Config()
if len(config.P2P.Peers.Initial) > 0 {
initialPeers := config.P2P.Peers.Initial
@ -117,11 +116,11 @@ func (p *P2PImpl) Start() error {
return nil
}
func (p *P2PImpl) Stop() error {
func (p *P2PService) Stop() error {
panic("implement me")
}
func (p *P2PImpl) Init() error {
func (p *P2PService) Init() error {
if p.inited {
return nil
}
@ -137,7 +136,7 @@ func (p *P2PImpl) Init() error {
return nil
}
func (p *P2PImpl) ConnectToNode(connectionUris []*url.URL, retried bool, fromPeer net.Peer) error {
func (p *P2PService) ConnectToNode(connectionUris []*url.URL, retried bool, fromPeer net.Peer) error {
if !p.Node().IsStarted() {
return nil
}
@ -343,7 +342,7 @@ func (p *P2PImpl) ConnectToNode(connectionUris []*url.URL, retried bool, fromPee
}
func (p *P2PImpl) OnNewPeer(peer net.Peer, verifyId bool) error {
func (p *P2PService) OnNewPeer(peer net.Peer, verifyId bool) error {
var wg sync.WaitGroup
var pid string
@ -400,7 +399,7 @@ func (p *P2PImpl) OnNewPeer(peer net.Peer, verifyId bool) error {
p.logger.Debug("OnNewPeer ended", zap.String("peer", pid))
return nil
}
func (p *P2PImpl) OnNewPeerListen(peer net.Peer, verifyId bool) {
func (p *P2PService) OnNewPeerListen(peer net.Peer, verifyId bool) {
onDone := net.CloseCallback(func() {
if peer.Id() != nil {
pid, err := peer.Id().ToString()
@ -444,7 +443,7 @@ func (p *P2PImpl) OnNewPeerListen(peer net.Peer, verifyId bool) {
Original: message,
Data: reader.Data,
Ctx: context.Background(),
Node: p.node.(*node.NodeImpl),
Node: p.node,
Peer: peer,
VerifyId: verifyId,
}
@ -471,7 +470,7 @@ func (p *P2PImpl) OnNewPeerListen(peer net.Peer, verifyId bool) {
})
}
func (p *P2PImpl) readNodeVotes(nodeId *encoding.NodeId) (interfaces.NodeVotes, error) {
func (p *P2PService) readNodeVotes(nodeId *encoding.NodeId) (NodeVotes, error) {
var value []byte
var found bool
err := p.node.Db().View(func(tx *bolt.Tx) error {
@ -493,7 +492,7 @@ func (p *P2PImpl) readNodeVotes(nodeId *encoding.NodeId) (interfaces.NodeVotes,
return NewNodeVotes(), nil
}
var score interfaces.NodeVotes
var score NodeVotes
err = msgpack.Unmarshal(value, &score)
if err != nil {
return nil, err
@ -502,7 +501,7 @@ func (p *P2PImpl) readNodeVotes(nodeId *encoding.NodeId) (interfaces.NodeVotes,
return score, nil
}
func (p *P2PImpl) saveNodeVotes(nodeId *encoding.NodeId, votes interfaces.NodeVotes) error {
func (p *P2PService) saveNodeVotes(nodeId *encoding.NodeId, votes NodeVotes) error {
// Marshal the votes into data
data, err := msgpack.Marshal(votes)
if err != nil {
@ -521,7 +520,7 @@ func (p *P2PImpl) saveNodeVotes(nodeId *encoding.NodeId, votes interfaces.NodeVo
return err
}
func (p *P2PImpl) GetNodeScore(nodeId *encoding.NodeId) (float64, error) {
func (p *P2PService) GetNodeScore(nodeId *encoding.NodeId) (float64, error) {
if nodeId.Equals(p.localNodeID) {
return 1, nil
}
@ -534,7 +533,7 @@ func (p *P2PImpl) GetNodeScore(nodeId *encoding.NodeId) (float64, error) {
return protocol.CalculateNodeScore(score.Good(), score.Bad()), nil
}
func (p *P2PImpl) SortNodesByScore(nodes []*encoding.NodeId) ([]*encoding.NodeId, error) {
func (p *P2PService) SortNodesByScore(nodes []*encoding.NodeId) ([]*encoding.NodeId, error) {
scores := make(map[encoding.NodeIdCode]float64)
var errOccurred error
@ -554,7 +553,7 @@ func (p *P2PImpl) SortNodesByScore(nodes []*encoding.NodeId) ([]*encoding.NodeId
return nodes, errOccurred
}
func (p *P2PImpl) SignMessageSimple(message []byte) ([]byte, error) {
func (p *P2PService) SignMessageSimple(message []byte) ([]byte, error) {
signedMessage := signed.NewSignedMessageRequest(message)
signedMessage.SetNodeId(p.localNodeID)
@ -573,7 +572,7 @@ func (p *P2PImpl) SignMessageSimple(message []byte) ([]byte, error) {
return result, nil
}
func (p *P2PImpl) AddPeer(peer net.Peer) error {
func (p *P2PService) AddPeer(peer net.Peer) error {
peerId, err := peer.Id().ToString()
if err != nil {
return err
@ -587,7 +586,7 @@ func (p *P2PImpl) AddPeer(peer net.Peer) error {
return nil
}
func (p *P2PImpl) SendPublicPeersToPeer(peer net.Peer, peersToSend []net.Peer) error {
func (p *P2PService) SendPublicPeersToPeer(peer net.Peer, peersToSend []net.Peer) error {
announceRequest := signed.NewAnnounceRequest(peer, peersToSend)
message, err := msgpack.Marshal(announceRequest)
@ -606,7 +605,7 @@ func (p *P2PImpl) SendPublicPeersToPeer(peer net.Peer, peersToSend []net.Peer) e
return nil
}
func (p *P2PImpl) SendHashRequest(hash *encoding.Multihash, kinds []types.StorageLocationType) error {
func (p *P2PService) SendHashRequest(hash *encoding.Multihash, kinds []types.StorageLocationType) error {
hashRequest := protocol.NewHashRequest(hash, kinds)
message, err := msgpack.Marshal(hashRequest)
if err != nil {
@ -625,7 +624,7 @@ func (p *P2PImpl) SendHashRequest(hash *encoding.Multihash, kinds []types.Storag
return nil
}
func (p *P2PImpl) UpVote(nodeId *encoding.NodeId) error {
func (p *P2PService) UpVote(nodeId *encoding.NodeId) error {
err := p.vote(nodeId, true)
if err != nil {
return err
@ -634,7 +633,7 @@ func (p *P2PImpl) UpVote(nodeId *encoding.NodeId) error {
return nil
}
func (p *P2PImpl) DownVote(nodeId *encoding.NodeId) error {
func (p *P2PService) DownVote(nodeId *encoding.NodeId) error {
err := p.vote(nodeId, false)
if err != nil {
return err
@ -643,7 +642,7 @@ func (p *P2PImpl) DownVote(nodeId *encoding.NodeId) error {
return nil
}
func (p *P2PImpl) vote(nodeId *encoding.NodeId, upvote bool) error {
func (p *P2PService) vote(nodeId *encoding.NodeId, upvote bool) error {
votes, err := p.readNodeVotes(nodeId)
if err != nil {
return err
@ -662,11 +661,11 @@ func (p *P2PImpl) vote(nodeId *encoding.NodeId, upvote bool) error {
return nil
}
func (p *P2PImpl) NodeId() *encoding.NodeId {
func (p *P2PService) NodeId() *encoding.NodeId {
return p.localNodeID
}
func (p *P2PImpl) PrepareProvideMessage(hash *encoding.Multihash, location interfaces.StorageLocation) []byte {
func (p *P2PService) PrepareProvideMessage(hash *encoding.Multihash, location storage.StorageLocation) []byte {
// Initialize the list with the record type.
list := []byte{byte(types.RecordTypeStorageLocation)}

View File

@ -3,8 +3,8 @@ package service
import (
"errors"
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
"git.lumeweb.com/LumeWeb/libs5-go/interfaces"
"git.lumeweb.com/LumeWeb/libs5-go/net"
_node "git.lumeweb.com/LumeWeb/libs5-go/node"
"git.lumeweb.com/LumeWeb/libs5-go/protocol"
"git.lumeweb.com/LumeWeb/libs5-go/structs"
"git.lumeweb.com/LumeWeb/libs5-go/types"
@ -16,42 +16,44 @@ import (
"time"
)
var _ interfaces.RegistryService = (*RegistryImpl)(nil)
const registryBucketName = "registry"
type RegistryImpl struct {
node interfaces.Node
var (
_ Service = (*RegistryService)(nil)
)
type RegistryService struct {
node *_node.Node
logger *zap.Logger
streams structs.Map
subs structs.Map
}
func (r *RegistryImpl) Node() interfaces.Node {
func (r *RegistryService) Node() *_node.Node {
return r.node
}
func (r *RegistryImpl) Start() error {
func (r *RegistryService) Start() error {
return nil
}
func (r *RegistryImpl) Stop() error {
func (r *RegistryService) Stop() error {
return nil
}
func (r *RegistryImpl) Init() error {
func (r *RegistryService) Init() error {
return utils.CreateBucket(registryBucketName, r.node.Db())
}
func NewRegistry(node interfaces.Node) *RegistryImpl {
return &RegistryImpl{
func NewRegistry(node *_node.Node) *RegistryService {
return &RegistryService{
node: node,
logger: node.Logger(),
streams: structs.NewMap(),
subs: structs.NewMap(),
}
}
func (r *RegistryImpl) Set(sre interfaces.SignedRegistryEntry, trusted bool, receivedFrom net.Peer) error {
func (r *RegistryService) Set(sre protocol.SignedRegistryEntry, trusted bool, receivedFrom net.Peer) error {
hash := encoding.NewMultihash(sre.PK())
hashString, err := hash.ToString()
if err != nil {
@ -138,7 +140,7 @@ func (r *RegistryImpl) Set(sre interfaces.SignedRegistryEntry, trusted bool, rec
return nil
}
func (r *RegistryImpl) BroadcastEntry(sre interfaces.SignedRegistryEntry, receivedFrom net.Peer) error {
func (r *RegistryService) BroadcastEntry(sre protocol.SignedRegistryEntry, receivedFrom net.Peer) error {
hash := encoding.NewMultihash(sre.PK())
hashString, err := hash.ToString()
if err != nil {
@ -171,7 +173,7 @@ func (r *RegistryImpl) BroadcastEntry(sre interfaces.SignedRegistryEntry, receiv
return nil
}
func (r *RegistryImpl) SendRegistryRequest(pk []byte) error {
func (r *RegistryService) SendRegistryRequest(pk []byte) error {
query := protocol.NewRegistryQuery(pk)
request, err := msgpack.Marshal(query)
@ -198,7 +200,7 @@ func (r *RegistryImpl) SendRegistryRequest(pk []byte) error {
return nil
}
func (r *RegistryImpl) Get(pk []byte) (interfaces.SignedRegistryEntry, error) {
func (r *RegistryService) Get(pk []byte) (protocol.SignedRegistryEntry, error) {
key := encoding.NewMultihash(pk)
keyString, err := key.ToString()
if err != nil {
@ -259,14 +261,14 @@ func (r *RegistryImpl) Get(pk []byte) (interfaces.SignedRegistryEntry, error) {
return nil, nil
}
func (r *RegistryImpl) Listen(pk []byte, cb func(sre interfaces.SignedRegistryEntry)) (func(), error) {
func (r *RegistryService) Listen(pk []byte, cb func(sre protocol.SignedRegistryEntry)) (func(), error) {
key, err := encoding.NewMultihash(pk).ToString()
if err != nil {
return nil, err
}
cbProxy := func(event *emitter.Event) {
sre, ok := event.Args[0].(interfaces.SignedRegistryEntry)
sre, ok := event.Args[0].(protocol.SignedRegistryEntry)
if !ok {
r.logger.Error("Failed to cast event to SignedRegistryEntry")
return
@ -292,7 +294,7 @@ func (r *RegistryImpl) Listen(pk []byte, cb func(sre interfaces.SignedRegistryEn
}, nil
}
func (r *RegistryImpl) getFromDB(pk []byte) (sre interfaces.SignedRegistryEntry, err error) {
func (r *RegistryService) getFromDB(pk []byte) (sre protocol.SignedRegistryEntry, err error) {
err = r.node.Db().View(func(txn *bbolt.Tx) error {
bucket := txn.Bucket([]byte(registryBucketName))
val := bucket.Get(pk)

16
service/service.go Normal file
View File

@ -0,0 +1,16 @@
package service
import "git.lumeweb.com/LumeWeb/libs5-go/node"
type Service interface {
Node() *node.Node
Start() error
Stop() error
Init() error
}
type Services interface {
P2P() *P2PService
Registry() *RegistryService
HTTP() *HTTPService
All() []Service
}

View File

@ -1,12 +1,11 @@
package service
import (
"git.lumeweb.com/LumeWeb/libs5-go/interfaces"
"github.com/vmihailenco/msgpack/v5"
)
var (
_ interfaces.NodeVotes = (*NodeVotesImpl)(nil)
_ NodeVotes = (*NodeVotesImpl)(nil)
_ msgpack.CustomDecoder = (*NodeVotesImpl)(nil)
_ msgpack.CustomEncoder = (*NodeVotesImpl)(nil)
)
@ -16,7 +15,7 @@ type NodeVotesImpl struct {
bad int
}
func NewNodeVotes() interfaces.NodeVotes {
func NewNodeVotes() NodeVotes {
return &NodeVotesImpl{
good: 0,
bad: 0,
@ -69,3 +68,12 @@ func (n *NodeVotesImpl) Upvote() {
func (n *NodeVotesImpl) Downvote() {
n.bad++
}
type NodeVotes interface {
msgpack.CustomEncoder
msgpack.CustomDecoder
Good() int
Bad() int
Upvote()
Downvote()
}

View File

@ -14,11 +14,11 @@ import (
)
var (
_ msgpack.CustomDecoder = (*StorageLocationMap)(nil)
_ msgpack.CustomEncoder = (*StorageLocationMap)(nil)
_ interfaces.StorageLocation = (*StorageLocationImpl)(nil)
_ interfaces.StorageLocationProvider = (*StorageLocationProviderImpl)(nil)
_ interfaces.SignedStorageLocation = (*SignedStorageLocationImpl)(nil)
_ msgpack.CustomDecoder = (*StorageLocationMap)(nil)
_ msgpack.CustomEncoder = (*StorageLocationMap)(nil)
_ StorageLocation = (*StorageLocationImpl)(nil)
_ StorageLocationProvider = (*StorageLocationProviderImpl)(nil)
_ SignedStorageLocation = (*SignedStorageLocationImpl)(nil)
)
type StorageLocationMap map[int]NodeStorage
@ -73,7 +73,7 @@ func (s *StorageLocationImpl) ProviderMessage() []byte {
return s.providerMessage
}
func NewStorageLocation(Type int, Parts []string, Expiry int64) interfaces.StorageLocation {
func NewStorageLocation(Type int, Parts []string, Expiry int64) StorageLocation {
return &StorageLocationImpl{
kind: Type,
parts: Parts,
@ -99,10 +99,10 @@ func (s *StorageLocationImpl) String() string {
type SignedStorageLocationImpl struct {
nodeID *encoding.NodeId
location interfaces.StorageLocation
location StorageLocation
}
func NewSignedStorageLocation(NodeID *encoding.NodeId, Location interfaces.StorageLocation) interfaces.SignedStorageLocation {
func NewSignedStorageLocation(NodeID *encoding.NodeId, Location StorageLocation) SignedStorageLocation {
return &SignedStorageLocationImpl{
nodeID: NodeID,
location: Location,
@ -122,7 +122,7 @@ func (ssl *SignedStorageLocationImpl) String() string {
func (ssl *SignedStorageLocationImpl) NodeId() *encoding.NodeId {
return ssl.nodeID
}
func (ssl *SignedStorageLocationImpl) Location() interfaces.StorageLocation {
func (ssl *SignedStorageLocationImpl) Location() StorageLocation {
return ssl.location
}
@ -189,7 +189,7 @@ type StorageLocationProviderImpl struct {
types []types.StorageLocationType
timeoutDuration time.Duration
availableNodes []*encoding.NodeId
uris map[string]interfaces.StorageLocation
uris map[string]StorageLocation
timeout time.Time
isTimedOut bool
isWaitingForUri bool
@ -283,7 +283,7 @@ func (s *StorageLocationProviderImpl) Start() error {
}()
return nil
}
func (s *StorageLocationProviderImpl) Next() (interfaces.SignedStorageLocation, error) {
func (s *StorageLocationProviderImpl) Next() (SignedStorageLocation, error) {
s.timeout = time.Now().Add(s.timeoutDuration)
for {
@ -319,7 +319,7 @@ func (s *StorageLocationProviderImpl) Next() (interfaces.SignedStorageLocation,
}
}
func (s *StorageLocationProviderImpl) Upvote(uri interfaces.SignedStorageLocation) error {
func (s *StorageLocationProviderImpl) Upvote(uri SignedStorageLocation) error {
err := s.node.Services().P2P().UpVote(uri.NodeId())
if err != nil {
return err
@ -328,7 +328,7 @@ func (s *StorageLocationProviderImpl) Upvote(uri interfaces.SignedStorageLocatio
return nil
}
func (s *StorageLocationProviderImpl) Downvote(uri interfaces.SignedStorageLocation) error {
func (s *StorageLocationProviderImpl) Downvote(uri SignedStorageLocation) error {
err := s.node.Services().P2P().DownVote(uri.NodeId())
if err != nil {
return err
@ -336,7 +336,7 @@ func (s *StorageLocationProviderImpl) Downvote(uri interfaces.SignedStorageLocat
return nil
}
func NewStorageLocationProvider(node interfaces.Node, hash *encoding.Multihash, locationTypes ...types.StorageLocationType) interfaces.StorageLocationProvider {
func NewStorageLocationProvider(node interfaces.Node, hash *encoding.Multihash, locationTypes ...types.StorageLocationType) StorageLocationProvider {
if locationTypes == nil {
locationTypes = []types.StorageLocationType{
types.StorageLocationTypeFull,
@ -348,7 +348,7 @@ func NewStorageLocationProvider(node interfaces.Node, hash *encoding.Multihash,
hash: hash,
types: locationTypes,
timeoutDuration: 60 * time.Second,
uris: make(map[string]interfaces.StorageLocation),
uris: make(map[string]StorageLocation),
}
}
func containsNode(slice []*encoding.NodeId, item *encoding.NodeId) bool {
@ -359,3 +359,36 @@ func containsNode(slice []*encoding.NodeId, item *encoding.NodeId) bool {
}
return false
}
type StorageLocationProvider interface {
Start() error
Next() (SignedStorageLocation, error)
Upvote(uri SignedStorageLocation) error
Downvote(uri SignedStorageLocation) error
}
type StorageLocation interface {
BytesURL() string
OutboardBytesURL() string
String() string
ProviderMessage() []byte
Type() int
Parts() []string
BinaryParts() [][]byte
Expiry() int64
SetProviderMessage(msg []byte)
SetType(t int)
SetParts(p []string)
SetBinaryParts(bp [][]byte)
SetExpiry(e int64)
}
type SignedStorageLocation interface {
String() string
NodeId() *encoding.NodeId
Location() StorageLocation
}
type ProviderStore interface {
CanProvide(hash *encoding.Multihash, kind []types.StorageLocationType) bool
Provide(hash *encoding.Multihash, kind []types.StorageLocationType) (StorageLocation, error)
}