feat: wip networking

This commit is contained in:
Derrick Hammer 2024-01-06 06:33:46 -05:00
parent 8d1bdd87ac
commit 8c29a284ce
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
20 changed files with 1444 additions and 0 deletions

22
config.go Normal file
View File

@ -0,0 +1,22 @@
package libs5_go
import (
"git.lumeweb.com/LumeWeb/libs5-go/ed25519"
bolt "go.etcd.io/bbolt"
"go.uber.org/zap"
)
type NodeConfig struct {
P2P P2PConfig
KeyPair ed25519.KeyPairEd25519
DB bolt.DB
Logger *zap.Logger
}
type P2PConfig struct {
Network string
Peers PeersConfig
}
type PeersConfig struct {
Initial []string
}

2
go.mod
View File

@ -5,6 +5,7 @@ go 1.20
require (
github.com/emirpasic/gods v1.18.1
github.com/google/go-cmp v0.6.0
github.com/gorilla/websocket v1.5.1
github.com/multiformats/go-multibase v0.2.0
github.com/vmihailenco/msgpack/v5 v5.4.1
go.etcd.io/bbolt v1.3.8
@ -17,5 +18,6 @@ require (
github.com/multiformats/go-base36 v0.1.0 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.15.0 // indirect
)

63
net/net.go Normal file
View File

@ -0,0 +1,63 @@
package net
import (
"errors"
"net/url"
"sync"
)
type TransportPeerConfig struct {
Socket interface{}
Uris []*url.URL
}
type PeerStatic interface {
Connect(uri *url.URL) (interface{}, error) // Returns a connection/socket
}
type PeerFactory interface {
NewPeer(options *TransportPeerConfig) (Peer, error)
}
var (
transports sync.Map
)
func init() {
transports = sync.Map{}
//RegisterPeerType("ws", WebSocketPeer)
//RegisterPeerType("wss", WebSocketPeer)
}
func RegisterTransport(peerType string, factory interface{}) {
if _, ok := factory.(PeerFactory); !ok {
panic("factory must implement PeerFactory")
}
if _, ok := factory.(PeerStatic); !ok {
panic("factory must implement PeerStatic")
}
transports.Store(peerType, factory)
}
func CreateTransportSocket(peerType string, uri *url.URL) (interface{}, error) {
static, ok := transports.Load(peerType)
if !ok {
return nil, errors.New("no static method registered for type: " + peerType)
}
t, err := static.(PeerStatic).Connect(uri)
return &t, err
}
func CreateTransportPeer(peerType string, options *TransportPeerConfig) (*Peer, error) {
factory, ok := transports.Load(peerType)
if !ok {
return nil, errors.New("no factory registered for type: " + peerType)
}
t, err := factory.(PeerFactory).NewPeer(options)
return &t, err
}

42
net/peer.go Normal file
View File

@ -0,0 +1,42 @@
package net
import (
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
"go.uber.org/zap"
"net/url"
)
// EventCallback type for the callback function
type EventCallback func(event []byte) error
// DoneCallback type for the onDone callback
type DoneCallback func()
// ErrorCallback type for the onError callback
type ErrorCallback func(args ...interface{})
// ListenerOptions struct for options
type ListenerOptions struct {
OnDone *DoneCallback
OnError *ErrorCallback
Logger *zap.Logger
}
type Peer interface {
SendMessage(message []byte) error
RenderLocationURI() string
ListenForMessages(callback EventCallback, options ListenerOptions)
End() error
SetId(id *encoding.NodeId)
GetId() *encoding.NodeId
SetChallenge(challenge []byte)
GetChallenge() []byte
}
type BasePeer struct {
ConnectionURIs []url.URL
IsConnected bool
challenge []byte
Socket interface{}
Id *encoding.NodeId
}

64
net/ws.go Normal file
View File

@ -0,0 +1,64 @@
package net
import (
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
"github.com/gorilla/websocket"
)
type WebSocketPeer struct {
BasePeer
Socket *websocket.Conn
}
func (p *WebSocketPeer) SendMessage(message []byte) {
err := p.Socket.WriteMessage(websocket.BinaryMessage, message)
if err != nil {
return
}
}
func (p *WebSocketPeer) RenderLocationURI() string {
return p.Socket.RemoteAddr().String()
}
func (p *WebSocketPeer) ListenForMessages(callback EventCallback, onClose func(), onError func(error)) {
for {
_, message, err := p.Socket.ReadMessage()
if err != nil {
if onError != nil {
onError(err)
}
break
}
err = callback(message)
if err != nil {
if onError != nil {
onError(err)
}
}
}
if onClose != nil {
onClose()
}
}
func (p *WebSocketPeer) End() {
err := p.Socket.Close()
if err != nil {
return
}
}
func (p *WebSocketPeer) SetId(id *encoding.NodeId) {
p.Id = id
}
func (p *WebSocketPeer) SetChallenge(challenge []byte) {
p.challenge = challenge
}
func (p *WebSocketPeer) GetChallenge() []byte {
return p.challenge
}

296
node.go Normal file
View File

@ -0,0 +1,296 @@
package libs5_go
import (
"git.lumeweb.com/LumeWeb/libs5-go/service"
"git.lumeweb.com/LumeWeb/libs5-go/structs"
bolt "go.etcd.io/bbolt"
"go.uber.org/zap"
)
type Metadata interface {
ToJson() map[string]interface{}
}
type services struct {
p2p *service.P2P
}
type Node struct {
nodeConfig *NodeConfig
metadataCache *structs.Map
started bool
hashQueryRoutingTable *structs.Map
services services
}
func NewNode(config *NodeConfig) *Node {
return &Node{
nodeConfig: config,
metadataCache: structs.NewMap(),
started: false,
hashQueryRoutingTable: structs.NewMap(),
}
}
func (n *Node) HashQueryRoutingTable() *structs.Map {
return n.hashQueryRoutingTable
}
func (n *Node) IsStarted() bool {
return n.started
}
func (n *Node) Config() *NodeConfig {
return n.nodeConfig
}
func (n *Node) Logger() *zap.Logger {
if n.nodeConfig != nil {
return n.nodeConfig.Logger
}
return nil
}
func (n *Node) Db() *bolt.DB {
if n.nodeConfig != nil {
return &n.nodeConfig.DB
}
return nil
}
/*
func (n *Node) Services() *S5Services {
if n.nodeConfig != nil {
return n.nodeConfig.Services
}
return nil
}
func (n *Node) Start() error {
n.started = true
return nil
}
func (n *Node) Stop() error {
n.started = false
return nil
}
func (n *Node) GetCachedStorageLocations(hash Multihash, types []int) (map[NodeId]*StorageLocation, error) {
locations := make(map[NodeId]*StorageLocation)
mapFromDB, err := n.readStorageLocationsFromDB(hash)
if err != nil {
return nil, err
}
if len(mapFromDB) == 0 {
return make(map[NodeId]*StorageLocation), nil
}
ts := time.Now().Unix()
for _, t := range types {
nodeMap, ok := mapFromDB[t]
if !ok {
continue
}
for key, value := range nodeMap {
if len(value) < 4 {
continue // or handle error
}
expiry, ok := value[3].(int64)
if !ok || expiry < ts {
continue
}
addresses, ok := value[1].([]string)
if !ok {
continue // or handle error
}
storageLocation := NewStorageLocation(t, addresses, expiry)
if len(value) > 4 {
if providerMessage, ok := value[4].(string); ok {
storageLocation.ProviderMessage = providerMessage
}
}
locations[NodeId(key)] = storageLocation
}
}
return locations, nil
}
func (n *Node) ReadStorageLocationsFromDB(hash Multihash) (map[int]map[NodeId]map[int]interface{}, error) {
locations := make(map[int]map[NodeId]map[int]interface{})
bytes, err := n.config.CacheDb.Get(StringifyHash(hash)) // Assume StringifyHash and CacheDb.Get are implemented
if err != nil {
return locations, nil
}
if bytes == nil {
return locations, nil
}
unpacker := NewUnpacker(bytes) // Assume NewUnpacker is implemented to handle the unpacking
mapLength, err := unpacker.UnpackMapLength()
if err != nil {
return nil, err
}
for i := 0; i < mapLength; i++ {
t, err := unpacker.UnpackInt()
if err != nil {
continue // or handle error
}
innerMap := make(map[NodeId]map[int]interface{})
locations[t] = innerMap
innerMapLength, err := unpacker.UnpackMapLength()
if err != nil {
continue // or handle error
}
for j := 0; j < innerMapLength; j++ {
nodeIdBytes, err := unpacker.UnpackBinary()
if err != nil {
continue // or handle error
}
nodeId := NodeId(nodeIdBytes)
// Assuming unpacker.UnpackMap() returns a map[string]interface{} and is implemented
unpackedMap, err := unpacker.UnpackMap()
if err != nil {
continue // or handle error
}
convertedMap := make(map[int]interface{})
for key, value := range unpackedMap {
intKey, err := strconv.Atoi(key)
if err != nil {
continue // or handle error
}
convertedMap[intKey] = value
}
innerMap[nodeId] = convertedMap
}
}
return locations, nil
}
func (n *Node) AddStorageLocation(hash Multihash, nodeId NodeId, location StorageLocation, message []byte, config S5Config) error {
// Read existing storage locations
mapFromDB, err := n.ReadStorageLocationsFromDB(hash)
if err != nil {
return err
}
// Get or create the inner map for the specific type
innerMap, exists := mapFromDB[location.Type]
if !exists {
innerMap = make(map[NodeId]map[int]interface{})
mapFromDB[location.Type] = innerMap
}
// Create location map with new data
locationMap := make(map[int]interface{})
locationMap[1] = location.Parts
// locationMap[2] = location.BinaryParts // Uncomment if BinaryParts is a field of StorageLocation
locationMap[3] = location.Expiry
locationMap[4] = message
// Update the inner map with the new location
innerMap[nodeId] = locationMap
// Serialize the updated map and store it in the database
packedBytes, err := NewPacker().Pack(mapFromDB) // Assuming NewPacker and Pack are implemented
if err != nil {
return err
}
err = config.CacheDb.Put(StringifyHash(hash), packedBytes) // Assume CacheDb.Put and StringifyHash are implemented
if err != nil {
return err
}
return nil
}
func (n *Node) DownloadBytesByHash(hash Multihash) ([]byte, error) {
dlUriProvider := NewStorageLocationProvider(n, hash, []int{storageLocationTypeFull, storageLocationTypeFile})
dlUriProvider.Start()
retryCount := 0
for {
dlUri, err := dlUriProvider.Next()
if err != nil {
return nil, err
}
n.Logger.Verbose(fmt.Sprintf("[try] %s", dlUri.Location.BytesUrl))
client := &http.Client{
Timeout: 30 * time.Second,
}
res, err := client.Get(dlUri.Location.BytesUrl)
if err != nil {
n.Logger.Catched(err)
dlUriProvider.Downvote(dlUri)
retryCount++
if retryCount > 32 {
return nil, errors.New("too many retries")
}
continue
}
defer res.Body.Close()
data, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, err
}
// Assuming blake3 and equalBytes functions are available
resHash := blake3(data)
if !equalBytes(hash.HashBytes, resHash) {
dlUriProvider.Downvote(dlUri)
continue
}
dlUriProvider.Upvote(dlUri)
return data, nil
}
}
func (n *Node) GetMetadataByCID(cid CID) (Metadata, error) {
var metadata Metadata
var ok bool
if metadata, ok = n.MetadataCache[cid.Hash]; !ok {
bytes, err := n.DownloadBytesByHash(cid.Hash)
if err != nil {
return Metadata{}, err
}
switch cid.Type {
case METADATA_MEDIA, BRIDGE: // Both cases use the same deserialization method
metadata, err = deserializeMediaMetadata(bytes)
case METADATA_WEBAPP:
metadata, err = deserializeWebAppMetadata(bytes)
default:
return Metadata{}, errors.New("unsupported metadata format")
}
if err != nil {
return Metadata{}, err
}
n.MetadataCache[cid.Hash] = metadata
}
return metadata, nil
}
*/

View File

@ -0,0 +1,19 @@
package protocol
import "github.com/vmihailenco/msgpack/v5"
type EncodeableMessage interface {
ToMessage() (message []byte, err error)
msgpack.CustomEncoder
}
type EncodeableMessageImpl struct {
}
func (e EncodeableMessageImpl) ToMessage() (message []byte, err error) {
return msgpack.Marshal(e)
}
func (e EncodeableMessageImpl) EncodeMsgpack(encoder *msgpack.Encoder) error {
panic("this method should be implemented by the child class")
}

View File

@ -0,0 +1,61 @@
package protocol
import (
"errors"
"git.lumeweb.com/LumeWeb/libs5-go"
"git.lumeweb.com/LumeWeb/libs5-go/net"
"git.lumeweb.com/LumeWeb/libs5-go/types"
"github.com/vmihailenco/msgpack/v5"
)
type HandshakeOpen struct {
challenge []byte
networkId string
IncomingMessageTypedImpl
IncomingMessageHandler
}
func (m HandshakeOpen) Challenge() []byte {
return m.challenge
}
func (m HandshakeOpen) NetworkId() string {
return m.networkId
}
var _ EncodeableMessage = (*HandshakeOpen)(nil)
var (
errInvalidChallenge = errors.New("Invalid challenge")
)
func NewHandshakeOpen(challenge []byte, networkId string) *HandshakeOpen {
return &HandshakeOpen{
challenge: challenge,
networkId: networkId,
}
}
func (m HandshakeOpen) EncodeMsgpack(enc *msgpack.Encoder) error {
err := enc.EncodeUint(uint64(types.ProtocolMethodHandshakeOpen))
if err != nil {
return err
}
err = enc.EncodeBytes(m.challenge)
if err != nil {
return err
}
if m.networkId != "" {
err = enc.EncodeString(m.networkId)
if err != nil {
return err
}
}
return nil
}
func (m *HandshakeOpen) HandleMessage(node *libs5_go.Node, peer *net.Peer, verifyId bool) error {
return nil
}

50
protocol/hash_query.go Normal file
View File

@ -0,0 +1,50 @@
package protocol
import (
libs5_go "git.lumeweb.com/LumeWeb/libs5-go"
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
"git.lumeweb.com/LumeWeb/libs5-go/net"
"github.com/vmihailenco/msgpack/v5"
)
var _ IncomingMessageTyped = (*HashQuery)(nil)
type HashQuery struct {
hash *encoding.Multihash
kinds []int
IncomingMessageTypedImpl
IncomingMessageHandler
}
func (h HashQuery) Hash() *encoding.Multihash {
return h.hash
}
func (h HashQuery) Kinds() []int {
return h.kinds
}
func (h *HashQuery) DecodeMessage(dec *msgpack.Decoder) error {
hash, err := dec.DecodeBytes()
if err != nil {
return err
}
h.hash = encoding.NewMultihash(hash)
var kinds []int
err = dec.Decode(&kinds)
if err != nil {
return err
}
h.kinds = kinds
return nil
}
func (h *HashQuery) HandleMessage(node *libs5_go.Node, peer *net.Peer, verifyId bool) error {
return nil
}

View File

@ -0,0 +1,107 @@
package protocol
import (
"fmt"
"git.lumeweb.com/LumeWeb/libs5-go"
"git.lumeweb.com/LumeWeb/libs5-go/net"
"git.lumeweb.com/LumeWeb/libs5-go/types"
"github.com/vmihailenco/msgpack/v5"
"net/url"
)
var (
_ EncodeableMessage = (*EncodeableMessageImpl)(nil)
)
type IncomingMessage interface {
HandleMessage(node *libs5_go.Node, peer *net.Peer, verifyId bool) error
SetIncomingMessage(msg IncomingMessage)
msgpack.CustomDecoder
}
type IncomingMessageTyped interface {
DecodeMessage(dec *msgpack.Decoder) error
IncomingMessage
}
type IncomingMessageImpl struct {
kind types.ProtocolMethod
data msgpack.RawMessage
known bool
}
func (i *IncomingMessageImpl) SetIncomingMessage(msg IncomingMessage) {
*i = interface{}(msg).(IncomingMessageImpl)
}
var _ msgpack.CustomDecoder = (*IncomingMessageImpl)(nil)
var _ IncomingMessage = (*IncomingMessageImpl)(nil)
func (i *IncomingMessageImpl) GetKind() types.ProtocolMethod {
return i.kind
}
func (i *IncomingMessageImpl) ToMessage() (message []byte, err error) {
return msgpack.Marshal(i)
}
func (i *IncomingMessageImpl) HandleMessage(node *libs5_go.Node, peer *net.Peer, verifyId bool) error {
panic("child class should implement this method")
}
func (i *IncomingMessageImpl) Kind() types.ProtocolMethod {
return i.kind
}
func (i *IncomingMessageImpl) Data() msgpack.RawMessage {
return i.data
}
type IncomingMessageTypedImpl struct {
IncomingMessageImpl
}
func NewIncomingMessageUnknown() *IncomingMessageImpl {
return &IncomingMessageImpl{
known: false,
}
}
func NewIncomingMessageKnown(kind types.ProtocolMethod, data msgpack.RawMessage) *IncomingMessageImpl {
return &IncomingMessageImpl{
kind: kind,
data: data,
known: true,
}
}
func NewIncomingMessageTyped(kind types.ProtocolMethod, data msgpack.RawMessage) *IncomingMessageTypedImpl {
known := NewIncomingMessageKnown(kind, data)
return &IncomingMessageTypedImpl{*known}
}
type IncomingMessageHandler func(node *libs5_go.Node, peer *net.Peer, u *url.URL, verifyId bool) error
func (i *IncomingMessageImpl) DecodeMsgpack(dec *msgpack.Decoder) error {
if i.known {
if msgTyped, ok := interface{}(i).(IncomingMessageTyped); ok {
return msgTyped.DecodeMessage(dec)
}
return fmt.Errorf("type assertion to IncomingMessageTyped failed")
}
kind, err := dec.DecodeInt()
if err != nil {
return err
}
i.kind = types.ProtocolMethod(kind)
raw, err := dec.DecodeRaw()
if err != nil {
return err
}
i.data = raw
return nil
}

47
protocol/message.go Normal file
View File

@ -0,0 +1,47 @@
package protocol
import (
"git.lumeweb.com/LumeWeb/libs5-go/types"
"sync"
)
var (
messageTypes sync.Map
)
var (
_ IncomingMessage = (*IncomingMessageImpl)(nil)
)
func init() {
messageTypes = sync.Map{}
// Register factory functions instead of instances
RegisterMessageType(types.ProtocolMethodHandshakeOpen, func() IncomingMessage {
return NewHandshakeOpen([]byte{}, "")
})
RegisterMessageType(types.ProtocolMethodSignedMessage, func() IncomingMessage {
return NewSignedMessage()
})
}
func RegisterMessageType(messageType types.ProtocolMethod, factoryFunc func() IncomingMessage) {
if factoryFunc == nil {
panic("factoryFunc cannot be nil")
}
messageTypes.Store(messageType, factoryFunc)
}
func GetMessageType(kind types.ProtocolMethod) (IncomingMessage, bool) {
value, ok := messageTypes.Load(kind)
if !ok {
return nil, false
}
factoryFunc, ok := value.(func() IncomingMessage)
if !ok {
return nil, false
}
return factoryFunc(), true
}

13
protocol/protocol.go Normal file
View File

@ -0,0 +1,13 @@
package protocol
import "crypto/rand"
func GenerateChallenge() []byte {
challenge := make([]byte, 32)
_, err := rand.Read(challenge)
if err != nil {
panic(err)
}
return challenge
}

View File

@ -0,0 +1,86 @@
package signed
import (
"bytes"
"errors"
libs5_go "git.lumeweb.com/LumeWeb/libs5-go"
"git.lumeweb.com/LumeWeb/libs5-go/net"
"git.lumeweb.com/LumeWeb/libs5-go/protocol"
"github.com/vmihailenco/msgpack/v5"
)
var _ protocol.IncomingMessageTyped = (*HandshakeDone)(nil)
type HandshakeDone struct {
protocol.HandshakeOpen
supportedFeatures int
}
func NewHandshakeDone() *HandshakeDone {
return &HandshakeDone{HandshakeOpen: *protocol.NewHandshakeOpen(nil, ""), supportedFeatures: -1}
}
func (h HandshakeDone) HandleMessage(node *libs5_go.Node, peer *net.Peer, verifyId bool) error {
if !(*node).IsStarted() {
err := (*peer).End()
if err != nil {
return nil
}
return nil
}
if !bytes.Equal((*peer).GetChallenge(), h.HandshakeOpen.Challenge()) {
return errors.New("Invalid challenge")
}
/*
if !verifyId {
(*peer).SetId(h)
} else {
if !peer.ID.Equals(pId) {
return errInvalidChallenge
}
}
peer.IsConnected = true
supportedFeatures := data.UnpackInt()
if supportedFeatures != 3 {
return errors.New("Remote node does not support required features")
}
node.Services.P2P.Peers[peer.ID.String()] = peer
node.Services.P2P.ReconnectDelay[peer.ID.String()] = 1
connectionUrisCount := data.UnpackInt()
peer.ConnectionUris = make([]*url.URL, 0)
for i := 0; i < connectionUrisCount; i++ {
uriStr := data.UnpackString()
uri, err := url.Parse(uriStr)
if err != nil {
return err
}
peer.ConnectionUris = append(peer.ConnectionUris, uri)
}
// Log information - Assuming a logging method exists
node.Logger.Info(fmt.Sprintf("[+] %s (%s)", peer.ID.String(), peer.RenderLocationUri().String()))
// Send peer lists and emit 'peerConnected' event
// Assuming appropriate methods exist in node.Services.P2P
node.Services.P2P.SendPublicPeersToPeer(peer)
*/
return nil
}
func (h HandshakeDone) DecodeMessage(dec *msgpack.Decoder) error {
supportedFeatures, err := dec.DecodeInt()
if err != nil {
return err
}
h.supportedFeatures = supportedFeatures
return nil
}

53
protocol/signed/signed.go Normal file
View File

@ -0,0 +1,53 @@
package signed
import (
"git.lumeweb.com/LumeWeb/libs5-go/protocol"
"git.lumeweb.com/LumeWeb/libs5-go/types"
"sync"
)
var (
messageTypes sync.Map
)
var (
_ IncomingMessage = (*IncomingMessageImpl)(nil)
)
type IncomingMessage interface {
protocol.IncomingMessage
}
type IncomingMessageImpl struct {
protocol.IncomingMessageImpl
message []byte
}
func init() {
messageTypes = sync.Map{}
RegisterMessageType(types.ProtocolMethodHandshakeDone, func() IncomingMessage {
return NewHandshakeDone()
})
}
func RegisterMessageType(messageType types.ProtocolMethod, factoryFunc func() IncomingMessage) {
if factoryFunc == nil {
panic("factoryFunc cannot be nil")
}
messageTypes.Store(messageType, factoryFunc)
}
func GetMessageType(kind types.ProtocolMethod) (protocol.IncomingMessage, bool) {
value, ok := messageTypes.Load(kind)
if !ok {
return nil, false
}
factoryFunc, ok := value.(func() IncomingMessage)
if !ok {
return nil, false
}
return factoryFunc(), true
}

109
protocol/signed_message.go Normal file
View File

@ -0,0 +1,109 @@
package protocol
import (
"crypto/ed25519"
"errors"
libs5_go "git.lumeweb.com/LumeWeb/libs5-go"
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
"git.lumeweb.com/LumeWeb/libs5-go/net"
"git.lumeweb.com/LumeWeb/libs5-go/protocol/signed"
"git.lumeweb.com/LumeWeb/libs5-go/types"
"github.com/vmihailenco/msgpack/v5"
)
var (
_ IncomingMessageTyped = (*SignedMessage)(nil)
_ msgpack.CustomDecoder = (*signedMessagePayoad)(nil)
)
var (
errInvalidSignature = errors.New("Invalid signature found")
)
type SignedMessage struct {
nodeId *encoding.NodeId
signature []byte
message []byte
IncomingMessageTypedImpl
}
type signedMessagePayoad struct {
kind int
message msgpack.RawMessage
}
func (s *signedMessagePayoad) DecodeMsgpack(dec *msgpack.Decoder) error {
kind, err := dec.DecodeInt()
if err != nil {
return err
}
s.kind = kind
message, err := dec.DecodeRaw()
if err != nil {
return err
}
s.message = message
return nil
}
func NewSignedMessage() *SignedMessage {
return &SignedMessage{}
}
func (s *SignedMessage) HandleMessage(node *libs5_go.Node, peer *net.Peer, verifyId bool) error {
var payload signedMessagePayoad
err := msgpack.Unmarshal(s.message, &payload)
if err != nil {
return err
}
if msgHandler, valid := signed.GetMessageType(types.ProtocolMethod(payload.kind)); valid {
msgHandler.SetIncomingMessage(s)
err := msgpack.Unmarshal(payload.message, &msgHandler)
if err != nil {
return err
}
err = msgHandler.HandleMessage(node, peer, verifyId)
if err != nil {
return err
}
}
return nil
}
func (s *SignedMessage) DecodeMessage(dec *msgpack.Decoder) error {
nodeId, err := dec.DecodeBytes()
if err != nil {
return err
}
s.nodeId = encoding.NewNodeId(nodeId)
signature, err := dec.DecodeBytes()
if err != nil {
return err
}
s.signature = signature
message, err := dec.DecodeBytes()
if err != nil {
return err
}
s.message = message
if !ed25519.Verify(s.nodeId.Raw(), s.message, s.signature) {
return errInvalidSignature
}
return nil
}

1
service.go Normal file
View File

@ -0,0 +1 @@
package libs5_go

273
service/p2p.go Normal file
View File

@ -0,0 +1,273 @@
package service
import (
"errors"
libs5_go "git.lumeweb.com/LumeWeb/libs5-go"
"git.lumeweb.com/LumeWeb/libs5-go/ed25519"
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
"git.lumeweb.com/LumeWeb/libs5-go/net"
"git.lumeweb.com/LumeWeb/libs5-go/protocol"
"git.lumeweb.com/LumeWeb/libs5-go/structs"
"github.com/vmihailenco/msgpack/v5"
bolt "go.etcd.io/bbolt"
"go.uber.org/zap"
"net/url"
"time"
)
var _ Service = (*P2P)(nil)
var (
errUnsupportedProtocol = errors.New("unsupported protocol")
errConnectionIdMissingNodeID = errors.New("connection id missing node id")
)
const nodeBucketName = "nodes"
type P2P struct {
logger *zap.Logger
nodeKeyPair ed25519.KeyPairEd25519
localNodeID *encoding.NodeId
networkID string
nodesBucket *bolt.Bucket
node *libs5_go.Node
inited bool
reconnectDelay *structs.Map
peers *structs.Map
}
func NewP2P(node *libs5_go.Node) *P2P {
service := &P2P{
logger: node.Logger(),
nodeKeyPair: node.Config().KeyPair,
networkID: node.Config().P2P.Network,
node: node,
inited: false,
reconnectDelay: structs.NewMap(),
peers: structs.NewMap(),
}
return service
}
func (p *P2P) Node() *libs5_go.Node {
return p.node
}
func (p *P2P) Peers() *structs.Map {
return p.peers
}
func (p *P2P) Start() error {
err := p.Init()
if err != nil {
return err
}
config := p.Node().Config()
if len(config.P2P.Peers.Initial) > 0 {
initialPeers := config.P2P.Peers.Initial
for _, peer := range initialPeers {
u, err := url.Parse(peer)
if err != nil {
return err
}
err = p.ConnectToNode([]*url.URL{u}, false)
if err != nil {
return err
}
}
}
return nil
}
func (p *P2P) Stop() error {
panic("implement me")
}
func (p *P2P) Init() error {
if p.inited {
return nil
}
p.localNodeID = encoding.NewNodeId(p.nodeKeyPair.PublicKey())
err := p.Node().Db().Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists([]byte(nodeBucketName))
if err != nil {
return err
}
p.nodesBucket = bucket
return nil
})
if err != nil {
return err
}
return nil
}
func (p *P2P) ConnectToNode(connectionUris []*url.URL, retried bool) error {
if !p.Node().IsStarted() {
return nil
}
unsupported, _ := url.Parse("http://0.0.0.0")
unsupported.Scheme = "unsupported"
var connectionUri *url.URL
for _, uri := range connectionUris {
if uri.Scheme == "ws:" || uri.Scheme == "wss:" {
connectionUri = uri
break
}
}
if connectionUri == nil {
for _, uri := range connectionUris {
if uri.Scheme == "tcp:" {
connectionUri = uri
break
}
}
}
if connectionUri == nil {
connectionUri = unsupported
}
if connectionUri.Scheme == "unsupported" {
return errUnsupportedProtocol
}
scheme := connectionUri.Scheme
if connectionUri.User == nil {
return errConnectionIdMissingNodeID
}
username := connectionUri.User.Username()
id, err := encoding.DecodeNodeId(username)
if err != nil {
return err
}
idString, err := id.ToString()
if err != nil {
return err
}
reconnectDelay := p.reconnectDelay.GetInt(idString)
if reconnectDelay == nil {
*reconnectDelay = 1
}
if id.Equals(p.localNodeID) {
return nil
}
p.logger.Debug("connect", zap.String("node", connectionUri.String()))
socket, err := net.CreateTransportSocket(scheme, connectionUri)
if err != nil {
if retried {
p.logger.Error("failed to connect, too many retries", zap.String("node", connectionUri.String()), zap.Error(err))
return nil
}
retried = true
p.logger.Error("failed to connect", zap.String("node", connectionUri.String()), zap.Error(err))
delay := *p.reconnectDelay.GetInt(idString)
p.reconnectDelay.PutInt(idString, delay*2)
time.Sleep(time.Duration(delay) * time.Second)
return p.ConnectToNode(connectionUris, retried)
}
peer, err := net.CreateTransportPeer(scheme, &net.TransportPeerConfig{
Socket: socket,
Uris: []*url.URL{connectionUri},
})
if err != nil {
return err
}
(*peer).SetId(id)
return p.onNewPeer(peer, true)
}
func (p *P2P) onNewPeer(peer *net.Peer, verifyId bool) error {
challenge := protocol.GenerateChallenge()
pd := *peer
pd.SetChallenge(challenge)
p.onNewPeerListen(peer, verifyId)
handshakeOpenMsg, err := protocol.NewHandshakeOpen(challenge, p.networkID).ToMessage()
if err != nil {
return err
}
err = pd.SendMessage(handshakeOpenMsg)
if err != nil {
return err
}
return nil
}
func (p *P2P) onNewPeerListen(peer *net.Peer, verifyId bool) {
onDone := net.DoneCallback(func() {
peerId, err := (*peer).GetId().ToString()
if err != nil {
p.logger.Error("failed to get peer id", zap.Error(err))
return
}
// Handle closure of the connection
if p.peers.Contains(peerId) {
p.peers.Remove(peerId)
}
})
onError := net.ErrorCallback(func(args ...interface{}) {
p.logger.Error("peer error", zap.Any("args", args))
})
(*peer).ListenForMessages(func(message []byte) error {
imsg := protocol.NewIncomingMessageUnknown()
err := msgpack.Unmarshal(message, imsg)
if err != nil {
return err
}
handler, ok := protocol.GetMessageType(imsg.GetKind())
if ok {
handler.SetIncomingMessage(imsg)
err := msgpack.Unmarshal(imsg.Data(), handler)
if err != nil {
return err
}
err = handler.HandleMessage(p.node, peer, verifyId)
if err != nil {
return err
}
}
return nil
}, net.ListenerOptions{
OnDone: &onDone,
OnError: &onError,
Logger: p.logger,
})
}

10
service/service.go Normal file
View File

@ -0,0 +1,10 @@
package service
import libs5_go "git.lumeweb.com/LumeWeb/libs5-go"
type Service interface {
Node() *libs5_go.Node
Start() error
Stop() error
Init() error
}

123
structs/map.go Normal file
View File

@ -0,0 +1,123 @@
package structs
import (
"github.com/emirpasic/gods/maps/hashmap"
"log"
"sync"
)
type Map struct {
*hashmap.Map
mutex *sync.RWMutex
}
func NewMap() *Map {
return &Map{
Map: hashmap.New(),
mutex: &sync.RWMutex{},
}
}
func (m *Map) Get(key interface{}) (value interface{}, found bool) {
m.mutex.RLock()
defer m.mutex.RUnlock()
return m.Map.Get(key)
}
func (m *Map) GetInt(key interface{}) (value *int) {
val, found := m.Get(key)
if !found {
return nil
}
if intValue, ok := val.(int); ok {
value = &intValue
} else {
log.Fatalf("value is not an int: %v", val)
}
return value
}
func (m *Map) GetString(key interface{}) (value *string) {
val, found := m.Get(key)
if !found {
return nil
}
if _, ok := val.(string); ok {
value = val.(*string)
} else {
log.Fatalf("value is not a string: %v", value)
}
return
}
func (m *Map) Put(key interface{}, value interface{}) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.Map.Put(key, value)
}
func (m *Map) PutInt(key interface{}, value int) {
m.Put(key, value)
}
func (m *Map) Remove(key interface{}) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.Map.Remove(key)
}
func (m *Map) Keys() []interface{} {
m.mutex.RLock()
defer m.mutex.RUnlock()
return m.Map.Keys()
}
func (m *Map) Values() []interface{} {
m.mutex.RLock()
defer m.mutex.RUnlock()
return m.Map.Values()
}
func (m *Map) Size() int {
m.mutex.RLock()
defer m.mutex.RUnlock()
return m.Map.Size()
}
func (m *Map) Empty() bool {
m.mutex.RLock()
defer m.mutex.RUnlock()
return m.Map.Empty()
}
func (m *Map) Clear() {
m.mutex.Lock()
defer m.mutex.Unlock()
m.Map.Clear()
}
func (m *Map) String() string {
m.mutex.RLock()
defer m.mutex.RUnlock()
return m.Map.String()
}
func (m *Map) GetKey(value interface{}) (key interface{}, found bool) {
m.mutex.RLock()
defer m.mutex.RUnlock()
return m.Map.Get(value)
}
func (m *Map) Contains(value interface{}) bool {
m.mutex.RLock()
defer m.mutex.RUnlock()
_, has := m.Map.Get(value)
return has
}

3
types/feature.go Normal file
View File

@ -0,0 +1,3 @@
package types
const SupportedFeatures = 0x03