diff --git a/config.go b/config.go new file mode 100644 index 0000000..89c0508 --- /dev/null +++ b/config.go @@ -0,0 +1,22 @@ +package libs5_go + +import ( + "git.lumeweb.com/LumeWeb/libs5-go/ed25519" + bolt "go.etcd.io/bbolt" + "go.uber.org/zap" +) + +type NodeConfig struct { + P2P P2PConfig + KeyPair ed25519.KeyPairEd25519 + DB bolt.DB + Logger *zap.Logger +} +type P2PConfig struct { + Network string + Peers PeersConfig +} + +type PeersConfig struct { + Initial []string +} diff --git a/go.mod b/go.mod index a64c4a2..0c2bcd0 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.20 require ( github.com/emirpasic/gods v1.18.1 github.com/google/go-cmp v0.6.0 + github.com/gorilla/websocket v1.5.1 github.com/multiformats/go-multibase v0.2.0 github.com/vmihailenco/msgpack/v5 v5.4.1 go.etcd.io/bbolt v1.3.8 @@ -17,5 +18,6 @@ require ( github.com/multiformats/go-base36 v0.1.0 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect go.uber.org/multierr v1.11.0 // indirect + golang.org/x/net v0.17.0 // indirect golang.org/x/sys v0.15.0 // indirect ) diff --git a/net/net.go b/net/net.go new file mode 100644 index 0000000..6d0cd27 --- /dev/null +++ b/net/net.go @@ -0,0 +1,63 @@ +package net + +import ( + "errors" + "net/url" + "sync" +) + +type TransportPeerConfig struct { + Socket interface{} + Uris []*url.URL +} + +type PeerStatic interface { + Connect(uri *url.URL) (interface{}, error) // Returns a connection/socket +} + +type PeerFactory interface { + NewPeer(options *TransportPeerConfig) (Peer, error) +} + +var ( + transports sync.Map +) + +func init() { + transports = sync.Map{} + //RegisterPeerType("ws", WebSocketPeer) + //RegisterPeerType("wss", WebSocketPeer) +} +func RegisterTransport(peerType string, factory interface{}) { + if _, ok := factory.(PeerFactory); !ok { + panic("factory must implement PeerFactory") + } + + if _, ok := factory.(PeerStatic); !ok { + panic("factory must implement PeerStatic") + } + + transports.Store(peerType, factory) +} + +func CreateTransportSocket(peerType string, uri *url.URL) (interface{}, error) { + static, ok := transports.Load(peerType) + if !ok { + return nil, errors.New("no static method registered for type: " + peerType) + } + + t, err := static.(PeerStatic).Connect(uri) + + return &t, err +} + +func CreateTransportPeer(peerType string, options *TransportPeerConfig) (*Peer, error) { + factory, ok := transports.Load(peerType) + if !ok { + return nil, errors.New("no factory registered for type: " + peerType) + } + + t, err := factory.(PeerFactory).NewPeer(options) + + return &t, err +} diff --git a/net/peer.go b/net/peer.go new file mode 100644 index 0000000..0e84bda --- /dev/null +++ b/net/peer.go @@ -0,0 +1,42 @@ +package net + +import ( + "git.lumeweb.com/LumeWeb/libs5-go/encoding" + "go.uber.org/zap" + "net/url" +) + +// EventCallback type for the callback function +type EventCallback func(event []byte) error + +// DoneCallback type for the onDone callback +type DoneCallback func() + +// ErrorCallback type for the onError callback +type ErrorCallback func(args ...interface{}) + +// ListenerOptions struct for options +type ListenerOptions struct { + OnDone *DoneCallback + OnError *ErrorCallback + Logger *zap.Logger +} + +type Peer interface { + SendMessage(message []byte) error + RenderLocationURI() string + ListenForMessages(callback EventCallback, options ListenerOptions) + End() error + SetId(id *encoding.NodeId) + GetId() *encoding.NodeId + SetChallenge(challenge []byte) + GetChallenge() []byte +} + +type BasePeer struct { + ConnectionURIs []url.URL + IsConnected bool + challenge []byte + Socket interface{} + Id *encoding.NodeId +} diff --git a/net/ws.go b/net/ws.go new file mode 100644 index 0000000..bcf37e5 --- /dev/null +++ b/net/ws.go @@ -0,0 +1,64 @@ +package net + +import ( + "git.lumeweb.com/LumeWeb/libs5-go/encoding" + "github.com/gorilla/websocket" +) + +type WebSocketPeer struct { + BasePeer + Socket *websocket.Conn +} + +func (p *WebSocketPeer) SendMessage(message []byte) { + err := p.Socket.WriteMessage(websocket.BinaryMessage, message) + if err != nil { + return + } +} + +func (p *WebSocketPeer) RenderLocationURI() string { + return p.Socket.RemoteAddr().String() +} + +func (p *WebSocketPeer) ListenForMessages(callback EventCallback, onClose func(), onError func(error)) { + for { + _, message, err := p.Socket.ReadMessage() + if err != nil { + if onError != nil { + onError(err) + } + break + } + + err = callback(message) + if err != nil { + if onError != nil { + onError(err) + } + } + } + + if onClose != nil { + onClose() + } +} + +func (p *WebSocketPeer) End() { + err := p.Socket.Close() + if err != nil { + return + } +} + +func (p *WebSocketPeer) SetId(id *encoding.NodeId) { + p.Id = id +} + +func (p *WebSocketPeer) SetChallenge(challenge []byte) { + p.challenge = challenge +} + +func (p *WebSocketPeer) GetChallenge() []byte { + return p.challenge +} diff --git a/node.go b/node.go new file mode 100644 index 0000000..dcae551 --- /dev/null +++ b/node.go @@ -0,0 +1,296 @@ +package libs5_go + +import ( + "git.lumeweb.com/LumeWeb/libs5-go/service" + "git.lumeweb.com/LumeWeb/libs5-go/structs" + bolt "go.etcd.io/bbolt" + "go.uber.org/zap" +) + +type Metadata interface { + ToJson() map[string]interface{} +} + +type services struct { + p2p *service.P2P +} + +type Node struct { + nodeConfig *NodeConfig + metadataCache *structs.Map + started bool + hashQueryRoutingTable *structs.Map + services services +} + +func NewNode(config *NodeConfig) *Node { + return &Node{ + nodeConfig: config, + metadataCache: structs.NewMap(), + started: false, + hashQueryRoutingTable: structs.NewMap(), + } +} +func (n *Node) HashQueryRoutingTable() *structs.Map { + return n.hashQueryRoutingTable +} + +func (n *Node) IsStarted() bool { + return n.started +} + +func (n *Node) Config() *NodeConfig { + return n.nodeConfig +} + +func (n *Node) Logger() *zap.Logger { + if n.nodeConfig != nil { + return n.nodeConfig.Logger + } + return nil +} + +func (n *Node) Db() *bolt.DB { + if n.nodeConfig != nil { + return &n.nodeConfig.DB + } + return nil +} + +/* +func (n *Node) Services() *S5Services { + if n.nodeConfig != nil { + return n.nodeConfig.Services + } + return nil +} + + +func (n *Node) Start() error { + n.started = true + return nil +} + +func (n *Node) Stop() error { + n.started = false + return nil +} +func (n *Node) GetCachedStorageLocations(hash Multihash, types []int) (map[NodeId]*StorageLocation, error) { + locations := make(map[NodeId]*StorageLocation) + + mapFromDB, err := n.readStorageLocationsFromDB(hash) + if err != nil { + return nil, err + } + if len(mapFromDB) == 0 { + return make(map[NodeId]*StorageLocation), nil + } + + ts := time.Now().Unix() + + for _, t := range types { + nodeMap, ok := mapFromDB[t] + if !ok { + continue + } + + for key, value := range nodeMap { + if len(value) < 4 { + continue // or handle error + } + + expiry, ok := value[3].(int64) + if !ok || expiry < ts { + continue + } + + addresses, ok := value[1].([]string) + if !ok { + continue // or handle error + } + + storageLocation := NewStorageLocation(t, addresses, expiry) + if len(value) > 4 { + if providerMessage, ok := value[4].(string); ok { + storageLocation.ProviderMessage = providerMessage + } + } + + locations[NodeId(key)] = storageLocation + } + } + + return locations, nil +} +func (n *Node) ReadStorageLocationsFromDB(hash Multihash) (map[int]map[NodeId]map[int]interface{}, error) { + locations := make(map[int]map[NodeId]map[int]interface{}) + + bytes, err := n.config.CacheDb.Get(StringifyHash(hash)) // Assume StringifyHash and CacheDb.Get are implemented + if err != nil { + return locations, nil + } + if bytes == nil { + return locations, nil + } + + unpacker := NewUnpacker(bytes) // Assume NewUnpacker is implemented to handle the unpacking + mapLength, err := unpacker.UnpackMapLength() + if err != nil { + return nil, err + } + + for i := 0; i < mapLength; i++ { + t, err := unpacker.UnpackInt() + if err != nil { + continue // or handle error + } + + innerMap := make(map[NodeId]map[int]interface{}) + locations[t] = innerMap + + innerMapLength, err := unpacker.UnpackMapLength() + if err != nil { + continue // or handle error + } + + for j := 0; j < innerMapLength; j++ { + nodeIdBytes, err := unpacker.UnpackBinary() + if err != nil { + continue // or handle error + } + nodeId := NodeId(nodeIdBytes) + + // Assuming unpacker.UnpackMap() returns a map[string]interface{} and is implemented + unpackedMap, err := unpacker.UnpackMap() + if err != nil { + continue // or handle error + } + + convertedMap := make(map[int]interface{}) + for key, value := range unpackedMap { + intKey, err := strconv.Atoi(key) + if err != nil { + continue // or handle error + } + convertedMap[intKey] = value + } + innerMap[nodeId] = convertedMap + } + } + return locations, nil +} +func (n *Node) AddStorageLocation(hash Multihash, nodeId NodeId, location StorageLocation, message []byte, config S5Config) error { + // Read existing storage locations + mapFromDB, err := n.ReadStorageLocationsFromDB(hash) + if err != nil { + return err + } + + // Get or create the inner map for the specific type + innerMap, exists := mapFromDB[location.Type] + if !exists { + innerMap = make(map[NodeId]map[int]interface{}) + mapFromDB[location.Type] = innerMap + } + + // Create location map with new data + locationMap := make(map[int]interface{}) + locationMap[1] = location.Parts + // locationMap[2] = location.BinaryParts // Uncomment if BinaryParts is a field of StorageLocation + locationMap[3] = location.Expiry + locationMap[4] = message + + // Update the inner map with the new location + innerMap[nodeId] = locationMap + + // Serialize the updated map and store it in the database + packedBytes, err := NewPacker().Pack(mapFromDB) // Assuming NewPacker and Pack are implemented + if err != nil { + return err + } + + err = config.CacheDb.Put(StringifyHash(hash), packedBytes) // Assume CacheDb.Put and StringifyHash are implemented + if err != nil { + return err + } + + return nil +} + +func (n *Node) DownloadBytesByHash(hash Multihash) ([]byte, error) { + dlUriProvider := NewStorageLocationProvider(n, hash, []int{storageLocationTypeFull, storageLocationTypeFile}) + dlUriProvider.Start() + + retryCount := 0 + for { + dlUri, err := dlUriProvider.Next() + if err != nil { + return nil, err + } + + n.Logger.Verbose(fmt.Sprintf("[try] %s", dlUri.Location.BytesUrl)) + + client := &http.Client{ + Timeout: 30 * time.Second, + } + res, err := client.Get(dlUri.Location.BytesUrl) + if err != nil { + n.Logger.Catched(err) + + dlUriProvider.Downvote(dlUri) + + retryCount++ + if retryCount > 32 { + return nil, errors.New("too many retries") + } + continue + } + defer res.Body.Close() + + data, err := ioutil.ReadAll(res.Body) + if err != nil { + return nil, err + } + + // Assuming blake3 and equalBytes functions are available + resHash := blake3(data) + + if !equalBytes(hash.HashBytes, resHash) { + dlUriProvider.Downvote(dlUri) + continue + } + + dlUriProvider.Upvote(dlUri) + return data, nil + } +} + +func (n *Node) GetMetadataByCID(cid CID) (Metadata, error) { + var metadata Metadata + var ok bool + + if metadata, ok = n.MetadataCache[cid.Hash]; !ok { + bytes, err := n.DownloadBytesByHash(cid.Hash) + if err != nil { + return Metadata{}, err + } + + switch cid.Type { + case METADATA_MEDIA, BRIDGE: // Both cases use the same deserialization method + metadata, err = deserializeMediaMetadata(bytes) + case METADATA_WEBAPP: + metadata, err = deserializeWebAppMetadata(bytes) + default: + return Metadata{}, errors.New("unsupported metadata format") + } + + if err != nil { + return Metadata{}, err + } + + n.MetadataCache[cid.Hash] = metadata + } + + return metadata, nil +} +*/ diff --git a/protocol/encodeable_message.go b/protocol/encodeable_message.go new file mode 100644 index 0000000..7f0e281 --- /dev/null +++ b/protocol/encodeable_message.go @@ -0,0 +1,19 @@ +package protocol + +import "github.com/vmihailenco/msgpack/v5" + +type EncodeableMessage interface { + ToMessage() (message []byte, err error) + msgpack.CustomEncoder +} + +type EncodeableMessageImpl struct { +} + +func (e EncodeableMessageImpl) ToMessage() (message []byte, err error) { + return msgpack.Marshal(e) +} + +func (e EncodeableMessageImpl) EncodeMsgpack(encoder *msgpack.Encoder) error { + panic("this method should be implemented by the child class") +} diff --git a/protocol/handshake_open.go b/protocol/handshake_open.go new file mode 100644 index 0000000..44c58f0 --- /dev/null +++ b/protocol/handshake_open.go @@ -0,0 +1,61 @@ +package protocol + +import ( + "errors" + "git.lumeweb.com/LumeWeb/libs5-go" + "git.lumeweb.com/LumeWeb/libs5-go/net" + "git.lumeweb.com/LumeWeb/libs5-go/types" + "github.com/vmihailenco/msgpack/v5" +) + +type HandshakeOpen struct { + challenge []byte + networkId string + IncomingMessageTypedImpl + IncomingMessageHandler +} + +func (m HandshakeOpen) Challenge() []byte { + return m.challenge +} + +func (m HandshakeOpen) NetworkId() string { + return m.networkId +} + +var _ EncodeableMessage = (*HandshakeOpen)(nil) +var ( + errInvalidChallenge = errors.New("Invalid challenge") +) + +func NewHandshakeOpen(challenge []byte, networkId string) *HandshakeOpen { + return &HandshakeOpen{ + challenge: challenge, + networkId: networkId, + } +} +func (m HandshakeOpen) EncodeMsgpack(enc *msgpack.Encoder) error { + err := enc.EncodeUint(uint64(types.ProtocolMethodHandshakeOpen)) + if err != nil { + return err + } + + err = enc.EncodeBytes(m.challenge) + if err != nil { + return err + } + + if m.networkId != "" { + err = enc.EncodeString(m.networkId) + if err != nil { + return err + } + } + + return nil +} + +func (m *HandshakeOpen) HandleMessage(node *libs5_go.Node, peer *net.Peer, verifyId bool) error { + + return nil +} diff --git a/protocol/hash_query.go b/protocol/hash_query.go new file mode 100644 index 0000000..f824027 --- /dev/null +++ b/protocol/hash_query.go @@ -0,0 +1,50 @@ +package protocol + +import ( + libs5_go "git.lumeweb.com/LumeWeb/libs5-go" + "git.lumeweb.com/LumeWeb/libs5-go/encoding" + "git.lumeweb.com/LumeWeb/libs5-go/net" + "github.com/vmihailenco/msgpack/v5" +) + +var _ IncomingMessageTyped = (*HashQuery)(nil) + +type HashQuery struct { + hash *encoding.Multihash + kinds []int + + IncomingMessageTypedImpl + IncomingMessageHandler +} + +func (h HashQuery) Hash() *encoding.Multihash { + return h.hash +} + +func (h HashQuery) Kinds() []int { + return h.kinds +} + +func (h *HashQuery) DecodeMessage(dec *msgpack.Decoder) error { + hash, err := dec.DecodeBytes() + + if err != nil { + return err + } + + h.hash = encoding.NewMultihash(hash) + + var kinds []int + err = dec.Decode(&kinds) + if err != nil { + return err + } + + h.kinds = kinds + + return nil +} +func (h *HashQuery) HandleMessage(node *libs5_go.Node, peer *net.Peer, verifyId bool) error { + + return nil +} diff --git a/protocol/incoming_message.go b/protocol/incoming_message.go new file mode 100644 index 0000000..d8519ab --- /dev/null +++ b/protocol/incoming_message.go @@ -0,0 +1,107 @@ +package protocol + +import ( + "fmt" + "git.lumeweb.com/LumeWeb/libs5-go" + "git.lumeweb.com/LumeWeb/libs5-go/net" + "git.lumeweb.com/LumeWeb/libs5-go/types" + "github.com/vmihailenco/msgpack/v5" + "net/url" +) + +var ( + _ EncodeableMessage = (*EncodeableMessageImpl)(nil) +) + +type IncomingMessage interface { + HandleMessage(node *libs5_go.Node, peer *net.Peer, verifyId bool) error + SetIncomingMessage(msg IncomingMessage) + msgpack.CustomDecoder +} + +type IncomingMessageTyped interface { + DecodeMessage(dec *msgpack.Decoder) error + IncomingMessage +} + +type IncomingMessageImpl struct { + kind types.ProtocolMethod + data msgpack.RawMessage + known bool +} + +func (i *IncomingMessageImpl) SetIncomingMessage(msg IncomingMessage) { + *i = interface{}(msg).(IncomingMessageImpl) +} + +var _ msgpack.CustomDecoder = (*IncomingMessageImpl)(nil) +var _ IncomingMessage = (*IncomingMessageImpl)(nil) + +func (i *IncomingMessageImpl) GetKind() types.ProtocolMethod { + return i.kind +} + +func (i *IncomingMessageImpl) ToMessage() (message []byte, err error) { + return msgpack.Marshal(i) +} + +func (i *IncomingMessageImpl) HandleMessage(node *libs5_go.Node, peer *net.Peer, verifyId bool) error { + panic("child class should implement this method") +} + +func (i *IncomingMessageImpl) Kind() types.ProtocolMethod { + return i.kind +} + +func (i *IncomingMessageImpl) Data() msgpack.RawMessage { + return i.data +} + +type IncomingMessageTypedImpl struct { + IncomingMessageImpl +} + +func NewIncomingMessageUnknown() *IncomingMessageImpl { + return &IncomingMessageImpl{ + known: false, + } +} + +func NewIncomingMessageKnown(kind types.ProtocolMethod, data msgpack.RawMessage) *IncomingMessageImpl { + return &IncomingMessageImpl{ + kind: kind, + data: data, + known: true, + } +} + +func NewIncomingMessageTyped(kind types.ProtocolMethod, data msgpack.RawMessage) *IncomingMessageTypedImpl { + known := NewIncomingMessageKnown(kind, data) + return &IncomingMessageTypedImpl{*known} +} + +type IncomingMessageHandler func(node *libs5_go.Node, peer *net.Peer, u *url.URL, verifyId bool) error + +func (i *IncomingMessageImpl) DecodeMsgpack(dec *msgpack.Decoder) error { + if i.known { + if msgTyped, ok := interface{}(i).(IncomingMessageTyped); ok { + return msgTyped.DecodeMessage(dec) + } + return fmt.Errorf("type assertion to IncomingMessageTyped failed") + } + + kind, err := dec.DecodeInt() + if err != nil { + return err + } + + i.kind = types.ProtocolMethod(kind) + + raw, err := dec.DecodeRaw() + if err != nil { + return err + } + + i.data = raw + return nil +} diff --git a/protocol/message.go b/protocol/message.go new file mode 100644 index 0000000..c1204a3 --- /dev/null +++ b/protocol/message.go @@ -0,0 +1,47 @@ +package protocol + +import ( + "git.lumeweb.com/LumeWeb/libs5-go/types" + "sync" +) + +var ( + messageTypes sync.Map +) + +var ( + _ IncomingMessage = (*IncomingMessageImpl)(nil) +) + +func init() { + messageTypes = sync.Map{} + + // Register factory functions instead of instances + RegisterMessageType(types.ProtocolMethodHandshakeOpen, func() IncomingMessage { + return NewHandshakeOpen([]byte{}, "") + }) + RegisterMessageType(types.ProtocolMethodSignedMessage, func() IncomingMessage { + return NewSignedMessage() + }) +} + +func RegisterMessageType(messageType types.ProtocolMethod, factoryFunc func() IncomingMessage) { + if factoryFunc == nil { + panic("factoryFunc cannot be nil") + } + messageTypes.Store(messageType, factoryFunc) +} + +func GetMessageType(kind types.ProtocolMethod) (IncomingMessage, bool) { + value, ok := messageTypes.Load(kind) + if !ok { + return nil, false + } + + factoryFunc, ok := value.(func() IncomingMessage) + if !ok { + return nil, false + } + + return factoryFunc(), true +} diff --git a/protocol/protocol.go b/protocol/protocol.go new file mode 100644 index 0000000..c68b40c --- /dev/null +++ b/protocol/protocol.go @@ -0,0 +1,13 @@ +package protocol + +import "crypto/rand" + +func GenerateChallenge() []byte { + challenge := make([]byte, 32) + _, err := rand.Read(challenge) + if err != nil { + panic(err) + } + + return challenge +} diff --git a/protocol/signed/handshake_done.go b/protocol/signed/handshake_done.go new file mode 100644 index 0000000..da0f897 --- /dev/null +++ b/protocol/signed/handshake_done.go @@ -0,0 +1,86 @@ +package signed + +import ( + "bytes" + "errors" + libs5_go "git.lumeweb.com/LumeWeb/libs5-go" + "git.lumeweb.com/LumeWeb/libs5-go/net" + "git.lumeweb.com/LumeWeb/libs5-go/protocol" + "github.com/vmihailenco/msgpack/v5" +) + +var _ protocol.IncomingMessageTyped = (*HandshakeDone)(nil) + +type HandshakeDone struct { + protocol.HandshakeOpen + supportedFeatures int +} + +func NewHandshakeDone() *HandshakeDone { + return &HandshakeDone{HandshakeOpen: *protocol.NewHandshakeOpen(nil, ""), supportedFeatures: -1} +} + +func (h HandshakeDone) HandleMessage(node *libs5_go.Node, peer *net.Peer, verifyId bool) error { + if !(*node).IsStarted() { + err := (*peer).End() + if err != nil { + return nil + } + return nil + } + + if !bytes.Equal((*peer).GetChallenge(), h.HandshakeOpen.Challenge()) { + return errors.New("Invalid challenge") + } + /* + if !verifyId { + (*peer).SetId(h) + } else { + if !peer.ID.Equals(pId) { + return errInvalidChallenge + } + } + + peer.IsConnected = true + + supportedFeatures := data.UnpackInt() + + if supportedFeatures != 3 { + return errors.New("Remote node does not support required features") + } + + node.Services.P2P.Peers[peer.ID.String()] = peer + node.Services.P2P.ReconnectDelay[peer.ID.String()] = 1 + + connectionUrisCount := data.UnpackInt() + + peer.ConnectionUris = make([]*url.URL, 0) + for i := 0; i < connectionUrisCount; i++ { + uriStr := data.UnpackString() + uri, err := url.Parse(uriStr) + if err != nil { + return err + } + peer.ConnectionUris = append(peer.ConnectionUris, uri) + } + + // Log information - Assuming a logging method exists + node.Logger.Info(fmt.Sprintf("[+] %s (%s)", peer.ID.String(), peer.RenderLocationUri().String())) + + // Send peer lists and emit 'peerConnected' event + // Assuming appropriate methods exist in node.Services.P2P + node.Services.P2P.SendPublicPeersToPeer(peer) + */ + return nil +} + +func (h HandshakeDone) DecodeMessage(dec *msgpack.Decoder) error { + supportedFeatures, err := dec.DecodeInt() + + if err != nil { + return err + } + + h.supportedFeatures = supportedFeatures + return nil +} diff --git a/protocol/signed/signed.go b/protocol/signed/signed.go new file mode 100644 index 0000000..438412c --- /dev/null +++ b/protocol/signed/signed.go @@ -0,0 +1,53 @@ +package signed + +import ( + "git.lumeweb.com/LumeWeb/libs5-go/protocol" + "git.lumeweb.com/LumeWeb/libs5-go/types" + "sync" +) + +var ( + messageTypes sync.Map +) + +var ( + _ IncomingMessage = (*IncomingMessageImpl)(nil) +) + +type IncomingMessage interface { + protocol.IncomingMessage +} + +type IncomingMessageImpl struct { + protocol.IncomingMessageImpl + message []byte +} + +func init() { + messageTypes = sync.Map{} + + RegisterMessageType(types.ProtocolMethodHandshakeDone, func() IncomingMessage { + return NewHandshakeDone() + }) +} + +func RegisterMessageType(messageType types.ProtocolMethod, factoryFunc func() IncomingMessage) { + if factoryFunc == nil { + panic("factoryFunc cannot be nil") + } + messageTypes.Store(messageType, factoryFunc) +} + +func GetMessageType(kind types.ProtocolMethod) (protocol.IncomingMessage, bool) { + value, ok := messageTypes.Load(kind) + if !ok { + return nil, false + } + + factoryFunc, ok := value.(func() IncomingMessage) + if !ok { + return nil, false + } + + return factoryFunc(), true +} diff --git a/protocol/signed_message.go b/protocol/signed_message.go new file mode 100644 index 0000000..4140688 --- /dev/null +++ b/protocol/signed_message.go @@ -0,0 +1,109 @@ +package protocol + +import ( + "crypto/ed25519" + "errors" + libs5_go "git.lumeweb.com/LumeWeb/libs5-go" + "git.lumeweb.com/LumeWeb/libs5-go/encoding" + "git.lumeweb.com/LumeWeb/libs5-go/net" + "git.lumeweb.com/LumeWeb/libs5-go/protocol/signed" + "git.lumeweb.com/LumeWeb/libs5-go/types" + "github.com/vmihailenco/msgpack/v5" +) + +var ( + _ IncomingMessageTyped = (*SignedMessage)(nil) + _ msgpack.CustomDecoder = (*signedMessagePayoad)(nil) +) + +var ( + errInvalidSignature = errors.New("Invalid signature found") +) + +type SignedMessage struct { + nodeId *encoding.NodeId + signature []byte + message []byte + IncomingMessageTypedImpl +} + +type signedMessagePayoad struct { + kind int + message msgpack.RawMessage +} + +func (s *signedMessagePayoad) DecodeMsgpack(dec *msgpack.Decoder) error { + kind, err := dec.DecodeInt() + if err != nil { + return err + } + + s.kind = kind + + message, err := dec.DecodeRaw() + if err != nil { + return err + } + + s.message = message + + return nil +} + +func NewSignedMessage() *SignedMessage { + return &SignedMessage{} +} + +func (s *SignedMessage) HandleMessage(node *libs5_go.Node, peer *net.Peer, verifyId bool) error { + var payload signedMessagePayoad + + err := msgpack.Unmarshal(s.message, &payload) + if err != nil { + return err + } + + if msgHandler, valid := signed.GetMessageType(types.ProtocolMethod(payload.kind)); valid { + msgHandler.SetIncomingMessage(s) + err := msgpack.Unmarshal(payload.message, &msgHandler) + if err != nil { + return err + } + + err = msgHandler.HandleMessage(node, peer, verifyId) + if err != nil { + return err + } + } + + return nil +} + +func (s *SignedMessage) DecodeMessage(dec *msgpack.Decoder) error { + nodeId, err := dec.DecodeBytes() + if err != nil { + return err + } + + s.nodeId = encoding.NewNodeId(nodeId) + + signature, err := dec.DecodeBytes() + if err != nil { + return err + } + + s.signature = signature + + message, err := dec.DecodeBytes() + if err != nil { + return err + } + + s.message = message + + if !ed25519.Verify(s.nodeId.Raw(), s.message, s.signature) { + return errInvalidSignature + } + + return nil + +} diff --git a/service.go b/service.go new file mode 100644 index 0000000..7b4404f --- /dev/null +++ b/service.go @@ -0,0 +1 @@ +package libs5_go diff --git a/service/p2p.go b/service/p2p.go new file mode 100644 index 0000000..3874f4d --- /dev/null +++ b/service/p2p.go @@ -0,0 +1,273 @@ +package service + +import ( + "errors" + libs5_go "git.lumeweb.com/LumeWeb/libs5-go" + "git.lumeweb.com/LumeWeb/libs5-go/ed25519" + "git.lumeweb.com/LumeWeb/libs5-go/encoding" + "git.lumeweb.com/LumeWeb/libs5-go/net" + "git.lumeweb.com/LumeWeb/libs5-go/protocol" + "git.lumeweb.com/LumeWeb/libs5-go/structs" + "github.com/vmihailenco/msgpack/v5" + bolt "go.etcd.io/bbolt" + "go.uber.org/zap" + "net/url" + "time" +) + +var _ Service = (*P2P)(nil) + +var ( + errUnsupportedProtocol = errors.New("unsupported protocol") + errConnectionIdMissingNodeID = errors.New("connection id missing node id") +) + +const nodeBucketName = "nodes" + +type P2P struct { + logger *zap.Logger + nodeKeyPair ed25519.KeyPairEd25519 + localNodeID *encoding.NodeId + networkID string + nodesBucket *bolt.Bucket + node *libs5_go.Node + inited bool + reconnectDelay *structs.Map + peers *structs.Map +} + +func NewP2P(node *libs5_go.Node) *P2P { + service := &P2P{ + logger: node.Logger(), + nodeKeyPair: node.Config().KeyPair, + networkID: node.Config().P2P.Network, + node: node, + inited: false, + reconnectDelay: structs.NewMap(), + peers: structs.NewMap(), + } + + return service +} + +func (p *P2P) Node() *libs5_go.Node { + return p.node +} + +func (p *P2P) Peers() *structs.Map { + return p.peers +} + +func (p *P2P) Start() error { + err := p.Init() + if err != nil { + return err + } + + config := p.Node().Config() + if len(config.P2P.Peers.Initial) > 0 { + initialPeers := config.P2P.Peers.Initial + + for _, peer := range initialPeers { + u, err := url.Parse(peer) + if err != nil { + return err + } + err = p.ConnectToNode([]*url.URL{u}, false) + if err != nil { + return err + } + } + } + + return nil +} + +func (p *P2P) Stop() error { + panic("implement me") +} + +func (p *P2P) Init() error { + if p.inited { + return nil + } + p.localNodeID = encoding.NewNodeId(p.nodeKeyPair.PublicKey()) + + err := p.Node().Db().Update(func(tx *bolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists([]byte(nodeBucketName)) + if err != nil { + return err + } + + p.nodesBucket = bucket + + return nil + }) + if err != nil { + return err + } + + return nil +} +func (p *P2P) ConnectToNode(connectionUris []*url.URL, retried bool) error { + if !p.Node().IsStarted() { + return nil + } + + unsupported, _ := url.Parse("http://0.0.0.0") + unsupported.Scheme = "unsupported" + + var connectionUri *url.URL + + for _, uri := range connectionUris { + if uri.Scheme == "ws:" || uri.Scheme == "wss:" { + connectionUri = uri + break + } + } + + if connectionUri == nil { + for _, uri := range connectionUris { + if uri.Scheme == "tcp:" { + connectionUri = uri + break + } + } + } + + if connectionUri == nil { + connectionUri = unsupported + } + + if connectionUri.Scheme == "unsupported" { + return errUnsupportedProtocol + } + + scheme := connectionUri.Scheme + + if connectionUri.User == nil { + return errConnectionIdMissingNodeID + } + + username := connectionUri.User.Username() + id, err := encoding.DecodeNodeId(username) + if err != nil { + return err + } + + idString, err := id.ToString() + if err != nil { + return err + } + + reconnectDelay := p.reconnectDelay.GetInt(idString) + if reconnectDelay == nil { + *reconnectDelay = 1 + } + + if id.Equals(p.localNodeID) { + return nil + } + + p.logger.Debug("connect", zap.String("node", connectionUri.String())) + + socket, err := net.CreateTransportSocket(scheme, connectionUri) + if err != nil { + if retried { + p.logger.Error("failed to connect, too many retries", zap.String("node", connectionUri.String()), zap.Error(err)) + return nil + } + retried = true + + p.logger.Error("failed to connect", zap.String("node", connectionUri.String()), zap.Error(err)) + + delay := *p.reconnectDelay.GetInt(idString) + p.reconnectDelay.PutInt(idString, delay*2) + + time.Sleep(time.Duration(delay) * time.Second) + + return p.ConnectToNode(connectionUris, retried) + } + + peer, err := net.CreateTransportPeer(scheme, &net.TransportPeerConfig{ + Socket: socket, + Uris: []*url.URL{connectionUri}, + }) + + if err != nil { + return err + } + + (*peer).SetId(id) + return p.onNewPeer(peer, true) +} + +func (p *P2P) onNewPeer(peer *net.Peer, verifyId bool) error { + challenge := protocol.GenerateChallenge() + + pd := *peer + pd.SetChallenge(challenge) + + p.onNewPeerListen(peer, verifyId) + + handshakeOpenMsg, err := protocol.NewHandshakeOpen(challenge, p.networkID).ToMessage() + + if err != nil { + return err + } + + err = pd.SendMessage(handshakeOpenMsg) + if err != nil { + return err + } + return nil +} +func (p *P2P) onNewPeerListen(peer *net.Peer, verifyId bool) { + onDone := net.DoneCallback(func() { + peerId, err := (*peer).GetId().ToString() + if err != nil { + p.logger.Error("failed to get peer id", zap.Error(err)) + return + } + + // Handle closure of the connection + if p.peers.Contains(peerId) { + p.peers.Remove(peerId) + } + }) + + onError := net.ErrorCallback(func(args ...interface{}) { + p.logger.Error("peer error", zap.Any("args", args)) + }) + + (*peer).ListenForMessages(func(message []byte) error { + imsg := protocol.NewIncomingMessageUnknown() + + err := msgpack.Unmarshal(message, imsg) + if err != nil { + return err + } + + handler, ok := protocol.GetMessageType(imsg.GetKind()) + + if ok { + + handler.SetIncomingMessage(imsg) + + err := msgpack.Unmarshal(imsg.Data(), handler) + if err != nil { + return err + } + err = handler.HandleMessage(p.node, peer, verifyId) + if err != nil { + return err + } + } + + return nil + }, net.ListenerOptions{ + OnDone: &onDone, + OnError: &onError, + Logger: p.logger, + }) +} diff --git a/service/service.go b/service/service.go new file mode 100644 index 0000000..f746929 --- /dev/null +++ b/service/service.go @@ -0,0 +1,10 @@ +package service + +import libs5_go "git.lumeweb.com/LumeWeb/libs5-go" + +type Service interface { + Node() *libs5_go.Node + Start() error + Stop() error + Init() error +} diff --git a/structs/map.go b/structs/map.go new file mode 100644 index 0000000..0e6b125 --- /dev/null +++ b/structs/map.go @@ -0,0 +1,123 @@ +package structs + +import ( + "github.com/emirpasic/gods/maps/hashmap" + "log" + "sync" +) + +type Map struct { + *hashmap.Map + mutex *sync.RWMutex +} + +func NewMap() *Map { + return &Map{ + Map: hashmap.New(), + mutex: &sync.RWMutex{}, + } +} + +func (m *Map) Get(key interface{}) (value interface{}, found bool) { + m.mutex.RLock() + defer m.mutex.RUnlock() + return m.Map.Get(key) +} + +func (m *Map) GetInt(key interface{}) (value *int) { + val, found := m.Get(key) + + if !found { + return nil + } + + if intValue, ok := val.(int); ok { + value = &intValue + } else { + log.Fatalf("value is not an int: %v", val) + } + + return value +} + +func (m *Map) GetString(key interface{}) (value *string) { + val, found := m.Get(key) + + if !found { + return nil + } + + if _, ok := val.(string); ok { + value = val.(*string) + } else { + log.Fatalf("value is not a string: %v", value) + } + + return +} + +func (m *Map) Put(key interface{}, value interface{}) { + m.mutex.Lock() + defer m.mutex.Unlock() + m.Map.Put(key, value) +} + +func (m *Map) PutInt(key interface{}, value int) { + m.Put(key, value) +} + +func (m *Map) Remove(key interface{}) { + m.mutex.Lock() + defer m.mutex.Unlock() + m.Map.Remove(key) +} + +func (m *Map) Keys() []interface{} { + m.mutex.RLock() + defer m.mutex.RUnlock() + return m.Map.Keys() +} + +func (m *Map) Values() []interface{} { + m.mutex.RLock() + defer m.mutex.RUnlock() + return m.Map.Values() +} + +func (m *Map) Size() int { + m.mutex.RLock() + defer m.mutex.RUnlock() + return m.Map.Size() +} + +func (m *Map) Empty() bool { + m.mutex.RLock() + defer m.mutex.RUnlock() + return m.Map.Empty() +} + +func (m *Map) Clear() { + m.mutex.Lock() + defer m.mutex.Unlock() + m.Map.Clear() +} + +func (m *Map) String() string { + m.mutex.RLock() + defer m.mutex.RUnlock() + return m.Map.String() +} + +func (m *Map) GetKey(value interface{}) (key interface{}, found bool) { + m.mutex.RLock() + defer m.mutex.RUnlock() + return m.Map.Get(value) +} + +func (m *Map) Contains(value interface{}) bool { + m.mutex.RLock() + defer m.mutex.RUnlock() + _, has := m.Map.Get(value) + + return has +} diff --git a/types/feature.go b/types/feature.go new file mode 100644 index 0000000..efb17e9 --- /dev/null +++ b/types/feature.go @@ -0,0 +1,3 @@ +package types + +const SupportedFeatures = 0x03