diff --git a/protocol/base/base.go b/protocol/base/base.go deleted file mode 100644 index efd7988..0000000 --- a/protocol/base/base.go +++ /dev/null @@ -1,73 +0,0 @@ -package base - -import ( - "context" - "git.lumeweb.com/LumeWeb/libs5-go/config" - "git.lumeweb.com/LumeWeb/libs5-go/net" - "git.lumeweb.com/LumeWeb/libs5-go/service" - "github.com/vmihailenco/msgpack/v5" - "go.uber.org/zap" - "io" -) - -var ( - _ msgpack.CustomDecoder = (*IncomingMessageReader)(nil) -) - -type IncomingMessage interface { - HandleMessage(message IncomingMessageData) error - DecodeMessage(dec *msgpack.Decoder, message IncomingMessageData) error - HandshakeRequirer -} - -type IncomingMessageData struct { - Original []byte - Data []byte - Ctx context.Context - Services service.Services - Logger *zap.Logger - Peer net.Peer - Config *config.NodeConfig - VerifyId bool -} - -type IncomingMessageReader struct { - Kind int - Data []byte -} - -func (i *IncomingMessageReader) DecodeMsgpack(dec *msgpack.Decoder) error { - kind, err := dec.DecodeInt() - if err != nil { - return err - } - - i.Kind = kind - - raw, err := io.ReadAll(dec.Buffered()) - - if err != nil { - return err - } - - i.Data = raw - - return nil -} - -type HandshakeRequirer interface { - RequiresHandshake() bool - SetRequiresHandshake(value bool) -} - -type HandshakeRequirement struct { - requiresHandshake bool -} - -func (hr *HandshakeRequirement) RequiresHandshake() bool { - return hr.requiresHandshake -} - -func (hr *HandshakeRequirement) SetRequiresHandshake(value bool) { - hr.requiresHandshake = value -} diff --git a/protocol/base/encodeable_message.go b/protocol/base/encodeable_message.go deleted file mode 100644 index 852546d..0000000 --- a/protocol/base/encodeable_message.go +++ /dev/null @@ -1,7 +0,0 @@ -package base - -import "github.com/vmihailenco/msgpack/v5" - -type EncodeableMessage interface { - msgpack.CustomEncoder -} diff --git a/protocol/handshake_open.go b/protocol/handshake_open.go index bc6d37f..e2abc2e 100644 --- a/protocol/handshake_open.go +++ b/protocol/handshake_open.go @@ -2,20 +2,19 @@ package protocol import ( "fmt" - "git.lumeweb.com/LumeWeb/libs5-go/protocol/base" "git.lumeweb.com/LumeWeb/libs5-go/protocol/signed" "git.lumeweb.com/LumeWeb/libs5-go/types" "github.com/vmihailenco/msgpack/v5" ) -var _ base.EncodeableMessage = (*HandshakeOpen)(nil) -var _ base.IncomingMessage = (*HandshakeOpen)(nil) +var _ EncodeableMessage = (*HandshakeOpen)(nil) +var _ IncomingMessage = (*HandshakeOpen)(nil) type HandshakeOpen struct { challenge []byte networkId string handshake []byte - base.HandshakeRequirement + HandshakeRequirement } func (h *HandshakeOpen) SetHandshake(handshake []byte) { @@ -30,8 +29,6 @@ func (h HandshakeOpen) NetworkId() string { return h.networkId } -var _ base.EncodeableMessage = (*HandshakeOpen)(nil) - func NewHandshakeOpen(challenge []byte, networkId string) *HandshakeOpen { ho := &HandshakeOpen{ challenge: challenge, @@ -62,7 +59,7 @@ func (h HandshakeOpen) EncodeMsgpack(enc *msgpack.Encoder) error { return nil } -func (h *HandshakeOpen) DecodeMessage(dec *msgpack.Decoder, message base.IncomingMessageData) error { +func (h *HandshakeOpen) DecodeMessage(dec *msgpack.Decoder, message IncomingMessageData) error { handshake, err := dec.DecodeBytes() if err != nil { @@ -93,7 +90,7 @@ func (h *HandshakeOpen) DecodeMessage(dec *msgpack.Decoder, message base.Incomin return nil } -func (h *HandshakeOpen) HandleMessage(message base.IncomingMessageData) error { +func (h *HandshakeOpen) HandleMessage(message IncomingMessageData) error { peer := message.Peer services := message.Services diff --git a/protocol/hash_query.go b/protocol/hash_query.go index a8ba12e..0309be5 100644 --- a/protocol/hash_query.go +++ b/protocol/hash_query.go @@ -13,12 +13,12 @@ import ( ) var _ base.EncodeableMessage = (*HashQuery)(nil) -var _ base.IncomingMessage = (*HashQuery)(nil) +var _ IncomingMessage = (*HashQuery)(nil) type HashQuery struct { hash *encoding.Multihash kinds []types.StorageLocationType - base.HandshakeRequirement + HandshakeRequirement } func NewHashQuery() *HashQuery { @@ -47,7 +47,7 @@ func (h HashQuery) Kinds() []types.StorageLocationType { return h.kinds } -func (h *HashQuery) DecodeMessage(dec *msgpack.Decoder, message base.IncomingMessageData) error { +func (h *HashQuery) DecodeMessage(dec *msgpack.Decoder, message IncomingMessageData) error { hash, err := dec.DecodeBytes() if err != nil { @@ -88,7 +88,7 @@ func (h HashQuery) EncodeMsgpack(enc *msgpack.Encoder) error { return nil } -func (h *HashQuery) HandleMessage(message base.IncomingMessageData) error { +func (h *HashQuery) HandleMessage(message IncomingMessageData) error { peer := message.Peer services := message.Services logger := message.Logger diff --git a/protocol/message.go b/protocol/message.go index 38d7e41..07ac176 100644 --- a/protocol/message.go +++ b/protocol/message.go @@ -1,48 +1,47 @@ package protocol import ( - "git.lumeweb.com/LumeWeb/libs5-go/protocol/base" "git.lumeweb.com/LumeWeb/libs5-go/protocol/signed" "git.lumeweb.com/LumeWeb/libs5-go/types" ) var ( - messageTypes map[int]func() base.IncomingMessage + messageTypes map[int]func() IncomingMessage ) func RegisterProtocols() { - messageTypes = make(map[int]func() base.IncomingMessage) + messageTypes = make(map[int]func() IncomingMessage) // Register factory functions instead of instances - RegisterMessageType(int(types.ProtocolMethodHandshakeOpen), func() base.IncomingMessage { + RegisterMessageType(int(types.ProtocolMethodHandshakeOpen), func() IncomingMessage { return NewHandshakeOpen([]byte{}, "") }) - RegisterMessageType(int(types.ProtocolMethodHashQuery), func() base.IncomingMessage { + RegisterMessageType(int(types.ProtocolMethodHashQuery), func() IncomingMessage { return NewHashQuery() }) - RegisterMessageType(int(types.RecordTypeStorageLocation), func() base.IncomingMessage { + RegisterMessageType(int(types.RecordTypeStorageLocation), func() IncomingMessage { return NewStorageLocation() }) - RegisterMessageType(int(types.RecordTypeRegistryEntry), func() base.IncomingMessage { + RegisterMessageType(int(types.RecordTypeRegistryEntry), func() IncomingMessage { return NewEmptyRegistryEntryRequest() }) - RegisterMessageType(int(types.ProtocolMethodRegistryQuery), func() base.IncomingMessage { + RegisterMessageType(int(types.ProtocolMethodRegistryQuery), func() IncomingMessage { return NewEmptyRegistryQuery() }) - RegisterMessageType(int(types.ProtocolMethodSignedMessage), func() base.IncomingMessage { + RegisterMessageType(int(types.ProtocolMethodSignedMessage), func() IncomingMessage { return signed.NewSignedMessage() }) } -func RegisterMessageType(messageType int, factoryFunc func() base.IncomingMessage) { +func RegisterMessageType(messageType int, factoryFunc func() IncomingMessage) { if factoryFunc == nil { panic("factoryFunc cannot be nil") } messageTypes[messageType] = factoryFunc } -func GetMessageType(kind int) (base.IncomingMessage, bool) { +func GetMessageType(kind int) (IncomingMessage, bool) { value, ok := messageTypes[kind] if !ok { return nil, false diff --git a/protocol/protocol.go b/protocol/protocol.go index 786a29e..cb5562e 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -1,7 +1,14 @@ package protocol import ( + "context" "crypto/rand" + "git.lumeweb.com/LumeWeb/libs5-go/config" + "git.lumeweb.com/LumeWeb/libs5-go/net" + "git.lumeweb.com/LumeWeb/libs5-go/service" + "github.com/vmihailenco/msgpack/v5" + "go.uber.org/zap" + "io" "math" ) @@ -26,3 +33,68 @@ func CalculateNodeScore(goodResponses, badResponses int) float64 { return score } + +var ( + _ msgpack.CustomDecoder = (*IncomingMessageReader)(nil) +) + +type IncomingMessage interface { + HandleMessage(message IncomingMessageData) error + DecodeMessage(dec *msgpack.Decoder, message IncomingMessageData) error + HandshakeRequirer +} +type EncodeableMessage interface { + msgpack.CustomEncoder +} + +type IncomingMessageData struct { + Original []byte + Data []byte + Ctx context.Context + Services service.Services + Logger *zap.Logger + Peer net.Peer + Config *config.NodeConfig + VerifyId bool +} + +type IncomingMessageReader struct { + Kind int + Data []byte +} + +func (i *IncomingMessageReader) DecodeMsgpack(dec *msgpack.Decoder) error { + kind, err := dec.DecodeInt() + if err != nil { + return err + } + + i.Kind = kind + + raw, err := io.ReadAll(dec.Buffered()) + + if err != nil { + return err + } + + i.Data = raw + + return nil +} + +type HandshakeRequirer interface { + RequiresHandshake() bool + SetRequiresHandshake(value bool) +} + +type HandshakeRequirement struct { + requiresHandshake bool +} + +func (hr *HandshakeRequirement) RequiresHandshake() bool { + return hr.requiresHandshake +} + +func (hr *HandshakeRequirement) SetRequiresHandshake(value bool) { + hr.requiresHandshake = value +} diff --git a/protocol/registry_entry.go b/protocol/registry_entry.go index 8e81f5c..2ba52ef 100644 --- a/protocol/registry_entry.go +++ b/protocol/registry_entry.go @@ -1,17 +1,16 @@ package protocol import ( - "git.lumeweb.com/LumeWeb/libs5-go/protocol/base" "git.lumeweb.com/LumeWeb/libs5-go/types" "github.com/vmihailenco/msgpack/v5" ) -var _ base.IncomingMessage = (*RegistryEntryRequest)(nil) -var _ base.EncodeableMessage = (*RegistryEntryRequest)(nil) +var _ IncomingMessage = (*RegistryEntryRequest)(nil) +var _ EncodeableMessage = (*RegistryEntryRequest)(nil) type RegistryEntryRequest struct { sre SignedRegistryEntry - base.HandshakeRequirement + HandshakeRequirement } func NewEmptyRegistryEntryRequest() *RegistryEntryRequest { @@ -39,7 +38,7 @@ func (s *RegistryEntryRequest) EncodeMsgpack(enc *msgpack.Encoder) error { return nil } -func (s *RegistryEntryRequest) DecodeMessage(dec *msgpack.Decoder, message base.IncomingMessageData) error { +func (s *RegistryEntryRequest) DecodeMessage(dec *msgpack.Decoder, message IncomingMessageData) error { sre, err := UnmarshalSignedRegistryEntry(message.Data) if err != nil { return err @@ -50,7 +49,7 @@ func (s *RegistryEntryRequest) DecodeMessage(dec *msgpack.Decoder, message base. return nil } -func (s *RegistryEntryRequest) HandleMessage(message base.IncomingMessageData) error { +func (s *RegistryEntryRequest) HandleMessage(message IncomingMessageData) error { peer := message.Peer services := message.Services return services.Registry().Set(s.sre, false, peer) diff --git a/protocol/registry_query.go b/protocol/registry_query.go index e18a9a5..b6e3d9e 100644 --- a/protocol/registry_query.go +++ b/protocol/registry_query.go @@ -1,17 +1,16 @@ package protocol import ( - "git.lumeweb.com/LumeWeb/libs5-go/protocol/base" "git.lumeweb.com/LumeWeb/libs5-go/types" "github.com/vmihailenco/msgpack/v5" ) -var _ base.IncomingMessage = (*RegistryQuery)(nil) -var _ base.EncodeableMessage = (*RegistryQuery)(nil) +var _ IncomingMessage = (*RegistryQuery)(nil) +var _ EncodeableMessage = (*RegistryQuery)(nil) type RegistryQuery struct { pk []byte - base.HandshakeRequirement + HandshakeRequirement } func NewEmptyRegistryQuery() *RegistryQuery { @@ -39,7 +38,7 @@ func (s *RegistryQuery) EncodeMsgpack(enc *msgpack.Encoder) error { return nil } -func (s *RegistryQuery) DecodeMessage(dec *msgpack.Decoder, message base.IncomingMessageData) error { +func (s *RegistryQuery) DecodeMessage(dec *msgpack.Decoder, message IncomingMessageData) error { pk, err := dec.DecodeBytes() if err != nil { return err @@ -50,7 +49,7 @@ func (s *RegistryQuery) DecodeMessage(dec *msgpack.Decoder, message base.Incomin return nil } -func (s *RegistryQuery) HandleMessage(message base.IncomingMessageData) error { +func (s *RegistryQuery) HandleMessage(message IncomingMessageData) error { services := message.Services peer := message.Peer sre, err := services.Registry().Get(s.pk) diff --git a/protocol/signed/announce_peers.go b/protocol/signed/announce_peers.go index 85380b6..0c757c5 100644 --- a/protocol/signed/announce_peers.go +++ b/protocol/signed/announce_peers.go @@ -3,7 +3,7 @@ package signed import ( "git.lumeweb.com/LumeWeb/libs5-go/encoding" "git.lumeweb.com/LumeWeb/libs5-go/net" - "git.lumeweb.com/LumeWeb/libs5-go/protocol/base" + "git.lumeweb.com/LumeWeb/libs5-go/protocol" "git.lumeweb.com/LumeWeb/libs5-go/types" "github.com/vmihailenco/msgpack/v5" "net/url" @@ -17,7 +17,7 @@ type AnnouncePeers struct { peer net.Peer connectionUris []*url.URL peersToSend []net.Peer - base.HandshakeRequirement + protocol.HandshakeRequirement } func (a *AnnouncePeers) PeersToSend() []net.Peer { diff --git a/protocol/signed/handshake_done.go b/protocol/signed/handshake_done.go index 4c6bc70..0cd129f 100644 --- a/protocol/signed/handshake_done.go +++ b/protocol/signed/handshake_done.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "git.lumeweb.com/LumeWeb/libs5-go/net" + "git.lumeweb.com/LumeWeb/libs5-go/protocol" "git.lumeweb.com/LumeWeb/libs5-go/protocol/base" "git.lumeweb.com/LumeWeb/libs5-go/types" "git.lumeweb.com/LumeWeb/libs5-go/utils" @@ -21,7 +22,7 @@ type HandshakeDone struct { supportedFeatures int connectionUris []*url.URL handshake []byte - base.HandshakeRequirement + protocol.HandshakeRequirement } func NewHandshakeDoneRequest(handshake []byte, supportedFeatures int, connectionUris []*url.URL) *HandshakeDone { diff --git a/protocol/signed/signed.go b/protocol/signed/signed.go index 56b9dfa..08f3d91 100644 --- a/protocol/signed/signed.go +++ b/protocol/signed/signed.go @@ -2,20 +2,20 @@ package signed import ( "git.lumeweb.com/LumeWeb/libs5-go/encoding" - "git.lumeweb.com/LumeWeb/libs5-go/protocol/base" + "git.lumeweb.com/LumeWeb/libs5-go/protocol" "git.lumeweb.com/LumeWeb/libs5-go/types" "github.com/vmihailenco/msgpack/v5" ) type IncomingMessageDataSigned struct { - base.IncomingMessageData + protocol.IncomingMessageData NodeId *encoding.NodeId } type IncomingMessageSigned interface { HandleMessage(message IncomingMessageDataSigned) error DecodeMessage(dec *msgpack.Decoder, message IncomingMessageDataSigned) error - base.HandshakeRequirer + protocol.HandshakeRequirer } var ( diff --git a/protocol/signed/signed_message.go b/protocol/signed/signed_message.go index 1a8405a..9f937b3 100644 --- a/protocol/signed/signed_message.go +++ b/protocol/signed/signed_message.go @@ -5,7 +5,7 @@ import ( "errors" "git.lumeweb.com/LumeWeb/libs5-go/config" "git.lumeweb.com/LumeWeb/libs5-go/encoding" - "git.lumeweb.com/LumeWeb/libs5-go/protocol/base" + "git.lumeweb.com/LumeWeb/libs5-go/protocol" "git.lumeweb.com/LumeWeb/libs5-go/types" "github.com/vmihailenco/msgpack/v5" "go.uber.org/zap" @@ -13,9 +13,9 @@ import ( ) var ( - _ base.IncomingMessage = (*SignedMessage)(nil) - _ msgpack.CustomDecoder = (*signedMessageReader)(nil) - _ msgpack.CustomEncoder = (*SignedMessage)(nil) + _ protocol.IncomingMessage = (*SignedMessage)(nil) + _ msgpack.CustomDecoder = (*signedMessageReader)(nil) + _ msgpack.CustomEncoder = (*SignedMessage)(nil) ) var ( @@ -26,7 +26,7 @@ type SignedMessage struct { nodeId *encoding.NodeId signature []byte message []byte - base.HandshakeRequirement + protocol.HandshakeRequirement } func (s *SignedMessage) NodeId() *encoding.NodeId { @@ -81,7 +81,7 @@ func NewSignedMessage() *SignedMessage { return sm } -func (s *SignedMessage) HandleMessage(message base.IncomingMessageData) error { +func (s *SignedMessage) HandleMessage(message protocol.IncomingMessageData) error { var payload signedMessageReader peer := message.Peer logger := message.Logger @@ -116,7 +116,7 @@ func (s *SignedMessage) HandleMessage(message base.IncomingMessageData) error { return nil } -func (s *SignedMessage) DecodeMessage(dec *msgpack.Decoder, message base.IncomingMessageData) error { +func (s *SignedMessage) DecodeMessage(dec *msgpack.Decoder, message protocol.IncomingMessageData) error { nodeId, err := dec.DecodeBytes() if err != nil { return err diff --git a/protocol/storage_location.go b/protocol/storage_location.go index 327a1e5..da17f1f 100644 --- a/protocol/storage_location.go +++ b/protocol/storage_location.go @@ -5,7 +5,6 @@ import ( "fmt" "git.lumeweb.com/LumeWeb/libs5-go/encoding" "git.lumeweb.com/LumeWeb/libs5-go/net" - "git.lumeweb.com/LumeWeb/libs5-go/protocol/base" "git.lumeweb.com/LumeWeb/libs5-go/storage" "git.lumeweb.com/LumeWeb/libs5-go/types" "git.lumeweb.com/LumeWeb/libs5-go/utils" @@ -14,7 +13,7 @@ import ( "go.uber.org/zap" ) -var _ base.IncomingMessage = (*StorageLocation)(nil) +var _ IncomingMessage = (*StorageLocation)(nil) type StorageLocation struct { hash *encoding.Multihash @@ -23,7 +22,7 @@ type StorageLocation struct { parts []string publicKey []byte signature []byte - base.HandshakeRequirement + HandshakeRequirement } func NewStorageLocation() *StorageLocation { @@ -34,11 +33,11 @@ func NewStorageLocation() *StorageLocation { return sl } -func (s *StorageLocation) DecodeMessage(dec *msgpack.Decoder, message base.IncomingMessageData) error { +func (s *StorageLocation) DecodeMessage(dec *msgpack.Decoder, message IncomingMessageData) error { // nop, we use the incoming message -> original already stored return nil } -func (s *StorageLocation) HandleMessage(message base.IncomingMessageData) error { +func (s *StorageLocation) HandleMessage(message IncomingMessageData) error { msg := message.Original services := message.Services peer := message.Peer diff --git a/service/default/p2p.go b/service/default/p2p.go index 2606357..75dd76f 100644 --- a/service/default/p2p.go +++ b/service/default/p2p.go @@ -9,7 +9,6 @@ import ( "git.lumeweb.com/LumeWeb/libs5-go/encoding" "git.lumeweb.com/LumeWeb/libs5-go/net" "git.lumeweb.com/LumeWeb/libs5-go/protocol" - "git.lumeweb.com/LumeWeb/libs5-go/protocol/base" "git.lumeweb.com/LumeWeb/libs5-go/protocol/signed" "git.lumeweb.com/LumeWeb/libs5-go/service" "git.lumeweb.com/LumeWeb/libs5-go/structs" @@ -419,7 +418,7 @@ func (p *P2PServiceDefault) OnNewPeerListen(peer net.Peer, verifyId bool) { }) peer.ListenForMessages(func(message []byte) error { - var reader base.IncomingMessageReader + var reader protocol.IncomingMessageReader err := msgpack.Unmarshal(message, &reader) if err != nil { @@ -434,7 +433,7 @@ func (p *P2PServiceDefault) OnNewPeerListen(peer net.Peer, verifyId bool) { return fmt.Errorf("unknown message type: %d", reader.Kind) } - data := base.IncomingMessageData{ + data := protocol.IncomingMessageData{ Original: message, Data: reader.Data, Ctx: context.Background(),