diff --git a/encoding/cid_test.go b/encoding/cid_test.go index c5328b6..396940b 100644 --- a/encoding/cid_test.go +++ b/encoding/cid_test.go @@ -116,7 +116,7 @@ func TestCID_HashCode(t *testing.T) { println(utils.DecodeEndian(testdata.RawCIDBytes[35:])) return type fields struct { - Type types.CIDType + kind types.CIDType Hash Multihash Size uint32 } @@ -128,7 +128,7 @@ func TestCID_HashCode(t *testing.T) { { name: "Bridge CID", fields: fields{ - Type: types.CIDTypeBridge, + kind: types.CIDTypeBridge, Hash: NewMultibase(), // Replace with a valid hash value }, want: , // Replace with the expected byte output for Bridge CID @@ -136,7 +136,7 @@ func TestCID_HashCode(t *testing.T) { { name: "Raw CID with Non-Zero Size", fields: fields{ - Type: types.CIDTypeRaw, + kind: types.CIDTypeRaw, Hash: *NewMultibase(testdata.RawCIDBytes[1:34]), Size: utils.DecodeEndian(testdata.RawCIDBytes[34:]), }, @@ -145,7 +145,7 @@ func TestCID_HashCode(t *testing.T) { { name: "Raw CID with Zero Size", fields: fields{ - Type: types.CIDTypeRaw, + kind: types.CIDTypeRaw, Hash: yourHashValue, // Replace with a valid hash value Size: 0, // Zero size }, @@ -154,7 +154,7 @@ func TestCID_HashCode(t *testing.T) { { name: "Default CID", fields: fields{ - Type: types.CIDTypeDefault, + kind: types.CIDTypeDefault, Hash: yourHashValue, // Replace with a valid hash value }, want: yourExpectedBytesForDefaultCID, // Replace with the expected byte output for Default CID @@ -163,7 +163,7 @@ func TestCID_HashCode(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { cid := &CID{ - Type: tt.fields.Type, + kind: tt.fields.kind, Hash: tt.fields.Hash, Size: tt.fields.Size, } diff --git a/interfaces/meta.go b/interfaces/meta.go new file mode 100644 index 0000000..82589a1 --- /dev/null +++ b/interfaces/meta.go @@ -0,0 +1,5 @@ +package interfaces + +type Metadata interface { + ToJson() map[string]interface{} +} diff --git a/interfaces/node.go b/interfaces/node.go new file mode 100644 index 0000000..876671e --- /dev/null +++ b/interfaces/node.go @@ -0,0 +1,21 @@ +package interfaces + +import ( + "git.lumeweb.com/LumeWeb/libs5-go/config" + "git.lumeweb.com/LumeWeb/libs5-go/encoding" + "git.lumeweb.com/LumeWeb/libs5-go/structs" + bolt "go.etcd.io/bbolt" + "go.uber.org/zap" +) + +type Node interface { + Services() *Services + HashQueryRoutingTable() *structs.Map + IsStarted() bool + Config() *config.NodeConfig + Logger() *zap.Logger + Db() *bolt.DB + Start() error + GetCachedStorageLocations(hash *encoding.Multihash, types []int) (map[string]*StorageLocation, error) + AddStorageLocation(hash *encoding.Multihash, nodeId *encoding.NodeId, location *StorageLocation, message []byte, config *config.NodeConfig) error +} diff --git a/interfaces/p2p.go b/interfaces/p2p.go new file mode 100644 index 0000000..d5f2c6e --- /dev/null +++ b/interfaces/p2p.go @@ -0,0 +1,22 @@ +package interfaces + +import ( + "git.lumeweb.com/LumeWeb/libs5-go/encoding" + "git.lumeweb.com/LumeWeb/libs5-go/net" + "git.lumeweb.com/LumeWeb/libs5-go/structs" + "net/url" +) + +type P2PService interface { + Node() *Node + Peers() *structs.Map + Start() error + Stop() error + Init() error + ConnectToNode(connectionUris []*url.URL, retried bool) error + onNewPeer(peer *net.Peer, verifyId bool) error + onNewPeerListen(peer *net.Peer, verifyId bool) + readNodeScore(nodeId *encoding.NodeId) (NodeVotes, error) + getNodeScore(nodeId *encoding.NodeId) (float64, error) + SortNodesByScore(nodes []*encoding.NodeId) ([]*encoding.NodeId, error) +} diff --git a/interfaces/service.go b/interfaces/service.go new file mode 100644 index 0000000..b63199d --- /dev/null +++ b/interfaces/service.go @@ -0,0 +1,11 @@ +package interfaces + +type Service interface { + Node() *Node + Start() error + Stop() error + Init() error +} +type Services interface { + P2P() *P2PService +} diff --git a/interfaces/storage.go b/interfaces/storage.go new file mode 100644 index 0000000..6552b3f --- /dev/null +++ b/interfaces/storage.go @@ -0,0 +1,20 @@ +package interfaces + +type StorageLocation interface { + BytesURL() string + OutboardBytesURL() string + String() string + ProviderMessage() []byte + Type() int + Parts() []string + BinaryParts() [][]byte + Expiry() int64 + SetProviderMessage(msg []byte) + SetType(t int) + SetParts(p []string) + SetBinaryParts(bp [][]byte) + SetExpiry(e int64) +} +type SignedStorageLocation interface { + String() string +} diff --git a/interfaces/vote.go b/interfaces/vote.go new file mode 100644 index 0000000..bca7df4 --- /dev/null +++ b/interfaces/vote.go @@ -0,0 +1,8 @@ +package interfaces + +import "github.com/vmihailenco/msgpack/v5" + +type NodeVotes interface { + msgpack.CustomEncoder + msgpack.CustomDecoder +} diff --git a/metadata/directory_map.go b/metadata/directory_map.go index 7af8156..5169dce 100644 --- a/metadata/directory_map.go +++ b/metadata/directory_map.go @@ -179,7 +179,7 @@ func unmarshalMapJson(bytes []byte, m *linkedhashmap.Map, newInstance unmarshalN return err } - // Type switch to handle different types + // kind switch to handle different types switch v := instance.(type) { case *DirectoryReference: m.Put(key, *v) diff --git a/node/node.go b/node/node.go index 5de22bc..e940a7f 100644 --- a/node/node.go +++ b/node/node.go @@ -3,7 +3,7 @@ package node import ( "git.lumeweb.com/LumeWeb/libs5-go/config" "git.lumeweb.com/LumeWeb/libs5-go/encoding" - "git.lumeweb.com/LumeWeb/libs5-go/service" + "git.lumeweb.com/LumeWeb/libs5-go/interfaces" "git.lumeweb.com/LumeWeb/libs5-go/structs" "git.lumeweb.com/LumeWeb/libs5-go/utils" "github.com/vmihailenco/msgpack/v5" @@ -12,68 +12,58 @@ import ( "time" ) -type Metadata interface { - ToJson() map[string]interface{} -} - -type Services struct { - p2p *service.P2P -} - -func (s *Services) P2P() *service.P2P { - return s.p2p -} +var _ interfaces.Node = (*NodeImpl)(nil) const cacheBucketName = "object-cache" -type Node struct { +type NodeImpl struct { nodeConfig *config.NodeConfig metadataCache *structs.Map started bool hashQueryRoutingTable *structs.Map - services Services + services interfaces.Services cacheBucket *bolt.Bucket } -func (n *Node) Services() *Services { +func (n *NodeImpl) Services() *interfaces.Services { return &n.services } -func NewNode(config *config.NodeConfig) *Node { - return &Node{ +func NewNode(config *config.NodeConfig) *NodeImpl { + return &NodeImpl{ nodeConfig: config, metadataCache: structs.NewMap(), started: false, hashQueryRoutingTable: structs.NewMap(), } } -func (n *Node) HashQueryRoutingTable() *structs.Map { +func (n *NodeImpl) HashQueryRoutingTable() *structs.Map { return n.hashQueryRoutingTable } -func (n *Node) IsStarted() bool { +func (n *NodeImpl) IsStarted() bool { return n.started } -func (n *Node) Config() *config.NodeConfig { +func (n *NodeImpl) Config() *config.NodeConfig { return n.nodeConfig } -func (n *Node) Logger() *zap.Logger { +func (n *NodeImpl) Logger() *zap.Logger { if n.nodeConfig != nil { return n.nodeConfig.Logger } return nil } -func (n *Node) Db() *bolt.DB { +func (n *NodeImpl) Db() *bolt.DB { if n.nodeConfig != nil { return n.nodeConfig.DB } return nil } -func (n *Node) Start() error { +func (n *NodeImpl) Start() error { err := utils.CreateBucket(cacheBucketName, n.Db(), func(bucket *bolt.Bucket) { n.cacheBucket = bucket @@ -88,32 +78,32 @@ func (n *Node) Start() error { } /* - func (n *Node) Services() *S5Services { + func (n *NodeImpl) Services() *S5Services { if n.nodeConfig != nil { return n.nodeConfig.Services } return nil } - func (n *Node) Start() error { + func (n *NodeImpl) Start() error { n.started = true return nil } - func (n *Node) Stop() error { + func (n *NodeImpl) Stop() error { n.started = false return nil } */ -func (n *Node) GetCachedStorageLocations(hash *encoding.Multihash, types []int) (map[string]*StorageLocation, error) { - locations := make(map[string]*StorageLocation) +func (n *NodeImpl) GetCachedStorageLocations(hash *encoding.Multihash, types []int) (map[string]*interfaces.StorageLocation, error) { + locations := make(map[string]*interfaces.StorageLocation) locationMap, err := n.readStorageLocationsFromDB(hash) if err != nil { return nil, err } if len(locationMap) == 0 { - return make(map[string]*StorageLocation), nil + return make(map[string]*interfaces.StorageLocation), nil } ts := time.Now().Unix() @@ -143,7 +133,7 @@ func (n *Node) GetCachedStorageLocations(hash *encoding.Multihash, types []int) storageLocation := NewStorageLocation(t, addresses, expiry) if len(value) > 4 { if providerMessage, ok := value[4].([]byte); ok { - storageLocation.ProviderMessage = providerMessage + (*storageLocation).SetProviderMessage(providerMessage) } } @@ -152,7 +142,7 @@ func (n *Node) GetCachedStorageLocations(hash *encoding.Multihash, types []int) } return locations, nil } -func (n *Node) readStorageLocationsFromDB(hash *encoding.Multihash) (storageLocationMap, error) { +func (n *NodeImpl) readStorageLocationsFromDB(hash *encoding.Multihash) (storageLocationMap, error) { locationMap := newStorageLocationMap() bytes := n.cacheBucket.Get(hash.FullBytes()) @@ -167,7 +157,7 @@ func (n *Node) readStorageLocationsFromDB(hash *encoding.Multihash) (storageLoca return locationMap, nil } -func (n *Node) AddStorageLocation(hash *encoding.Multihash, nodeId *encoding.NodeId, location *StorageLocation, message []byte, config *config.NodeConfig) error { +func (n *NodeImpl) AddStorageLocation(hash *encoding.Multihash, nodeId *encoding.NodeId, location *interfaces.StorageLocation, message []byte, config *config.NodeConfig) error { // Read existing storage locations locationDb, err := n.readStorageLocationsFromDB(hash) if err != nil { @@ -180,7 +170,7 @@ func (n *Node) AddStorageLocation(hash *encoding.Multihash, nodeId *encoding.Nod } // Get or create the inner map for the specific type - innerMap, exists := locationDb[location.Type] + innerMap, exists := locationDb[(*location).Type()] if !exists { innerMap = make(nodeStorage, 1) innerMap[nodeIdStr] = make(nodeDetailsStorage, 1) @@ -188,13 +178,13 @@ func (n *Node) AddStorageLocation(hash *encoding.Multihash, nodeId *encoding.Nod // Create location map with new data locationMap := make(map[int]interface{}, 3) - locationMap[1] = location.Parts - locationMap[3] = location.Expiry + locationMap[1] = (*location).Parts + locationMap[3] = (*location).Expiry locationMap[4] = message // Update the inner map with the new location innerMap[nodeIdStr] = locationMap - locationDb[location.Type] = innerMap + locationDb[(*location).Type()] = innerMap // Serialize the updated map and store it in the database packedBytes, err := msgpack.Marshal(locationDb) @@ -210,7 +200,7 @@ func (n *Node) AddStorageLocation(hash *encoding.Multihash, nodeId *encoding.Nod return nil } /* -func (n *Node) DownloadBytesByHash(hash Multihash) ([]byte, error) { +func (n *NodeImpl) DownloadBytesByHash(hash Multihash) ([]byte, error) { dlUriProvider := NewStorageLocationProvider(n, hash, []int{storageLocationTypeFull, storageLocationTypeFile}) dlUriProvider.Start() @@ -258,7 +248,7 @@ func (n *Node) DownloadBytesByHash(hash Multihash) ([]byte, error) { } } -func (n *Node) GetMetadataByCID(cid CID) (Metadata, error) { +func (n *NodeImpl) GetMetadataByCID(cid CID) (Metadata, error) { var metadata Metadata var ok bool @@ -268,7 +258,7 @@ func (n *Node) GetMetadataByCID(cid CID) (Metadata, error) { return Metadata{}, err } - switch cid.Type { + switch cid.kind { case METADATA_MEDIA, BRIDGE: // Both cases use the same deserialization method metadata, err = deserializeMediaMetadata(bytes) case METADATA_WEBAPP: diff --git a/node/storage.go b/node/storage.go index 1fe7452..461ab1f 100644 --- a/node/storage.go +++ b/node/storage.go @@ -3,6 +3,7 @@ package node import ( "fmt" "git.lumeweb.com/LumeWeb/libs5-go/encoding" + "git.lumeweb.com/LumeWeb/libs5-go/interfaces" "github.com/vmihailenco/msgpack/v5" "strconv" "time" @@ -10,60 +11,106 @@ import ( var ( _ msgpack.CustomDecoder = (*storageLocationMap)(nil) + + _ msgpack.CustomEncoder = (*storageLocationMap)(nil) + _ interfaces.StorageLocation = (*StorageLocationImpl)(nil) ) -type StorageLocation struct { - Type int - Parts []string - BinaryParts [][]byte - Expiry int64 - ProviderMessage []byte +type StorageLocationImpl struct { + kind int + parts []string + binaryParts [][]byte + expiry int64 + providerMessage []byte } -func NewStorageLocation(Type int, Parts []string, Expiry int64) *StorageLocation { - return &StorageLocation{ - Type: Type, - Parts: Parts, - Expiry: Expiry, +func (s *StorageLocationImpl) Type() int { + return s.kind +} + +func (s *StorageLocationImpl) Parts() []string { + //TODO implement me + panic("implement me") +} + +func (s *StorageLocationImpl) BinaryParts() [][]byte { + return s.binaryParts +} + +func (s *StorageLocationImpl) Expiry() int64 { + return s.expiry +} + +func (s *StorageLocationImpl) SetType(t int) { + s.kind = t +} + +func (s *StorageLocationImpl) SetParts(p []string) { + s.parts = p +} + +func (s *StorageLocationImpl) SetBinaryParts(bp [][]byte) { + s.binaryParts = bp +} + +func (s *StorageLocationImpl) SetExpiry(e int64) { + s.expiry = e +} + +func (s *StorageLocationImpl) SetProviderMessage(msg []byte) { + s.providerMessage = msg +} + +func (s *StorageLocationImpl) ProviderMessage() []byte { + return s.providerMessage +} + +func NewStorageLocation(Type int, Parts []string, Expiry int64) *interfaces.StorageLocation { + sl := &StorageLocationImpl{ + kind: Type, + parts: Parts, + expiry: Expiry, } + var location interfaces.StorageLocation = sl + return &location } -func (s *StorageLocation) BytesURL() string { - return s.Parts[0] +func (s *StorageLocationImpl) BytesURL() string { + return s.parts[0] } -func (s *StorageLocation) OutboardBytesURL() string { - if len(s.Parts) == 1 { - return s.Parts[0] + ".obao" +func (s *StorageLocationImpl) OutboardBytesURL() string { + if len(s.parts) == 1 { + return s.parts[0] + ".obao" } - return s.Parts[1] + return s.parts[1] } -func (s *StorageLocation) String() string { - expiryDate := time.Unix(s.Expiry, 0) - return "StorageLocation(" + strconv.Itoa(s.Type) + ", " + fmt.Sprint(s.Parts) + ", expiry: " + expiryDate.Format(time.RFC3339) + ")" +func (s *StorageLocationImpl) String() string { + expiryDate := time.Unix(s.expiry, 0) + return "StorageLocationImpl(" + strconv.Itoa(s.Type()) + ", " + fmt.Sprint(s.parts) + ", expiry: " + expiryDate.Format(time.RFC3339) + ")" } -type SignedStorageLocation struct { +type SignedStorageLocationImpl struct { NodeID encoding.NodeId - Location StorageLocation + Location StorageLocationImpl } -func NewSignedStorageLocation(NodeID encoding.NodeId, Location StorageLocation) *SignedStorageLocation { - return &SignedStorageLocation{ +func NewSignedStorageLocation(NodeID encoding.NodeId, Location StorageLocationImpl) *SignedStorageLocationImpl { + return &SignedStorageLocationImpl{ NodeID: NodeID, Location: Location, } } -func (ssl *SignedStorageLocation) String() string { +func (ssl *SignedStorageLocationImpl) String() string { nodeString, _ := ssl.NodeID.ToString() if nodeString == "" { nodeString = "failed to decode node id" } - return "SignedStorageLocation(" + ssl.Location.String() + ", " + nodeString + ")" + return "SignedStorageLocationImpl(" + ssl.Location.String() + ", " + nodeString + ")" } type storageLocationMap map[int]nodeStorage diff --git a/protocol/handshake_open.go b/protocol/handshake_open.go index 9e6c2d5..688675a 100644 --- a/protocol/handshake_open.go +++ b/protocol/handshake_open.go @@ -55,7 +55,7 @@ func (m HandshakeOpen) EncodeMsgpack(enc *msgpack.Encoder) error { return nil } -func (m *HandshakeOpen) HandleMessage(node *node.Node, peer *net.Peer, verifyId bool) error { +func (m *HandshakeOpen) HandleMessage(node *node.NodeImpl, peer *net.Peer, verifyId bool) error { return nil } diff --git a/protocol/hash_query.go b/protocol/hash_query.go index 57d45e0..8488553 100644 --- a/protocol/hash_query.go +++ b/protocol/hash_query.go @@ -47,7 +47,7 @@ func (h *HashQuery) DecodeMessage(dec *msgpack.Decoder) error { return nil } -func (h *HashQuery) HandleMessage(node *libs5_go.Node, peer *net.Peer, verifyId bool) error { +func (h *HashQuery) HandleMessage(node *libs5_go.NodeImpl, peer *net.Peer, verifyId bool) error { mapLocations, err := node.GetCachedStorageLocations(h.hash, h.kinds) if err != nil { log.Printf("Error getting cached storage locations: %v", err) diff --git a/protocol/incoming_message.go b/protocol/incoming_message.go index b1ebc26..6f74fff 100644 --- a/protocol/incoming_message.go +++ b/protocol/incoming_message.go @@ -14,7 +14,7 @@ var ( ) type IncomingMessage interface { - HandleMessage(node *node.Node, peer *net.Peer, verifyId bool) error + HandleMessage(node *node.NodeImpl, peer *net.Peer, verifyId bool) error SetIncomingMessage(msg IncomingMessage) msgpack.CustomDecoder } @@ -54,7 +54,7 @@ func (i *IncomingMessageImpl) ToMessage() (message []byte, err error) { return msgpack.Marshal(i) } -func (i *IncomingMessageImpl) HandleMessage(node *node.Node, peer *net.Peer, verifyId bool) error { +func (i *IncomingMessageImpl) HandleMessage(node *node.NodeImpl, peer *net.Peer, verifyId bool) error { panic("child class should implement this method") } @@ -89,7 +89,7 @@ func NewIncomingMessageTyped(kind types.ProtocolMethod, data msgpack.RawMessage) return &IncomingMessageTypedImpl{*known} } -type IncomingMessageHandler func(node *node.Node, peer *net.Peer, u *url.URL, verifyId bool) error +type IncomingMessageHandler func(node *node.NodeImpl, peer *net.Peer, u *url.URL, verifyId bool) error func (i *IncomingMessageImpl) DecodeMsgpack(dec *msgpack.Decoder) error { if i.known { diff --git a/protocol/signed/accounce_peers.go b/protocol/signed/accounce_peers.go index ad4b43d..317898e 100644 --- a/protocol/signed/accounce_peers.go +++ b/protocol/signed/accounce_peers.go @@ -60,7 +60,7 @@ func (a *AnnouncePeers) DecodeMessage(dec *msgpack.Decoder) error { return nil } -func (a AnnouncePeers) HandleMessage(node *libs5_go.Node, peer *net.Peer, verifyId bool) error { +func (a AnnouncePeers) HandleMessage(node *libs5_go.NodeImpl, peer *net.Peer, verifyId bool) error { if len(a.connectionUris) > 0 { firstUrl := a.connectionUris[0] uri := new(url.URL) diff --git a/protocol/signed/handshake_done.go b/protocol/signed/handshake_done.go index f2bc4a0..b11b24a 100644 --- a/protocol/signed/handshake_done.go +++ b/protocol/signed/handshake_done.go @@ -20,7 +20,7 @@ func NewHandshakeDone() *HandshakeDone { return &HandshakeDone{HandshakeOpen: *protocol.NewHandshakeOpen(nil, ""), supportedFeatures: -1} } -func (h HandshakeDone) HandleMessage(node *libs5_go.Node, peer *net.Peer, verifyId bool) error { +func (h HandshakeDone) HandleMessage(node *libs5_go.NodeImpl, peer *net.Peer, verifyId bool) error { if !(*node).IsStarted() { err := (*peer).End() if err != nil { diff --git a/protocol/signed_message.go b/protocol/signed_message.go index 77c12eb..a7c3df6 100644 --- a/protocol/signed_message.go +++ b/protocol/signed_message.go @@ -54,7 +54,7 @@ func NewSignedMessage() *SignedMessage { return &SignedMessage{} } -func (s *SignedMessage) HandleMessage(node *libs5_go.Node, peer *net.Peer, verifyId bool) error { +func (s *SignedMessage) HandleMessage(node *libs5_go.NodeImpl, peer *net.Peer, verifyId bool) error { var payload signedMessagePayoad err := msgpack.Unmarshal(s.message, &payload) diff --git a/protocol/storage_location.go b/protocol/storage_location.go index 07117de..69ba6f2 100644 --- a/protocol/storage_location.go +++ b/protocol/storage_location.go @@ -39,7 +39,7 @@ func (s *StorageLocation) DecodeMessage(dec *msgpack.Decoder) error { return nil } -func (s *StorageLocation) HandleMessage(node *node.Node, peer *net.Peer, verifyId bool) error { +func (s *StorageLocation) HandleMessage(node *node.NodeImpl, peer *net.Peer, verifyId bool) error { hash := encoding.NewMultihash(s.raw[1:34]) // Replace NewMultihash with appropriate function fmt.Println("Hash:", hash) @@ -72,7 +72,7 @@ func (s *StorageLocation) HandleMessage(node *node.Node, peer *net.Peer, verifyI nodeId := encoding.NewNodeId(publicKey) - // Assuming `node` is an instance of your Node structure + // Assuming `node` is an instance of your NodeImpl structure err := node.AddStorageLocation(hash, nodeId, node.NewStorageLocation(int(typeOfData), parts, int64(expiry)), s.raw, node.Config()) // Implement AddStorageLocation if err != nil { diff --git a/service/p2p.go b/service/p2p.go index f683965..8662096 100644 --- a/service/p2p.go +++ b/service/p2p.go @@ -34,7 +34,7 @@ type P2P struct { localNodeID *encoding.NodeId networkID string nodesBucket *bolt.Bucket - node *node.Node + node *node.NodeImpl inited bool reconnectDelay *structs.Map peers *structs.Map @@ -76,7 +76,7 @@ func (n *nodeVotes) DecodeMsgpack(dec *msgpack.Decoder) error { return nil } -func NewP2P(node *node.Node) *P2P { +func NewP2P(node *node.NodeImpl) *P2P { service := &P2P{ logger: node.Logger(), nodeKeyPair: node.Config().KeyPair, @@ -90,7 +90,7 @@ func NewP2P(node *node.Node) *P2P { return service } -func (p *P2P) Node() *node.Node { +func (p *P2P) Node() *node.NodeImpl { return p.node } diff --git a/service/service.go b/service/service.go index d1146f4..c76c122 100644 --- a/service/service.go +++ b/service/service.go @@ -5,7 +5,7 @@ import ( ) type Service interface { - Node() *node.Node + Node() *node.NodeImpl Start() error Stop() error Init() error