refactor: base pkg is not needed

This commit is contained in:
Derrick Hammer 2024-01-29 22:25:21 -05:00
parent bd08d75da4
commit af58aac985
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
14 changed files with 121 additions and 136 deletions

View File

@ -1,73 +0,0 @@
package base
import (
"context"
"git.lumeweb.com/LumeWeb/libs5-go/config"
"git.lumeweb.com/LumeWeb/libs5-go/net"
"git.lumeweb.com/LumeWeb/libs5-go/service"
"github.com/vmihailenco/msgpack/v5"
"go.uber.org/zap"
"io"
)
var (
_ msgpack.CustomDecoder = (*IncomingMessageReader)(nil)
)
type IncomingMessage interface {
HandleMessage(message IncomingMessageData) error
DecodeMessage(dec *msgpack.Decoder, message IncomingMessageData) error
HandshakeRequirer
}
type IncomingMessageData struct {
Original []byte
Data []byte
Ctx context.Context
Services service.Services
Logger *zap.Logger
Peer net.Peer
Config *config.NodeConfig
VerifyId bool
}
type IncomingMessageReader struct {
Kind int
Data []byte
}
func (i *IncomingMessageReader) DecodeMsgpack(dec *msgpack.Decoder) error {
kind, err := dec.DecodeInt()
if err != nil {
return err
}
i.Kind = kind
raw, err := io.ReadAll(dec.Buffered())
if err != nil {
return err
}
i.Data = raw
return nil
}
type HandshakeRequirer interface {
RequiresHandshake() bool
SetRequiresHandshake(value bool)
}
type HandshakeRequirement struct {
requiresHandshake bool
}
func (hr *HandshakeRequirement) RequiresHandshake() bool {
return hr.requiresHandshake
}
func (hr *HandshakeRequirement) SetRequiresHandshake(value bool) {
hr.requiresHandshake = value
}

View File

@ -1,7 +0,0 @@
package base
import "github.com/vmihailenco/msgpack/v5"
type EncodeableMessage interface {
msgpack.CustomEncoder
}

View File

@ -2,20 +2,19 @@ package protocol
import ( import (
"fmt" "fmt"
"git.lumeweb.com/LumeWeb/libs5-go/protocol/base"
"git.lumeweb.com/LumeWeb/libs5-go/protocol/signed" "git.lumeweb.com/LumeWeb/libs5-go/protocol/signed"
"git.lumeweb.com/LumeWeb/libs5-go/types" "git.lumeweb.com/LumeWeb/libs5-go/types"
"github.com/vmihailenco/msgpack/v5" "github.com/vmihailenco/msgpack/v5"
) )
var _ base.EncodeableMessage = (*HandshakeOpen)(nil) var _ EncodeableMessage = (*HandshakeOpen)(nil)
var _ base.IncomingMessage = (*HandshakeOpen)(nil) var _ IncomingMessage = (*HandshakeOpen)(nil)
type HandshakeOpen struct { type HandshakeOpen struct {
challenge []byte challenge []byte
networkId string networkId string
handshake []byte handshake []byte
base.HandshakeRequirement HandshakeRequirement
} }
func (h *HandshakeOpen) SetHandshake(handshake []byte) { func (h *HandshakeOpen) SetHandshake(handshake []byte) {
@ -30,8 +29,6 @@ func (h HandshakeOpen) NetworkId() string {
return h.networkId return h.networkId
} }
var _ base.EncodeableMessage = (*HandshakeOpen)(nil)
func NewHandshakeOpen(challenge []byte, networkId string) *HandshakeOpen { func NewHandshakeOpen(challenge []byte, networkId string) *HandshakeOpen {
ho := &HandshakeOpen{ ho := &HandshakeOpen{
challenge: challenge, challenge: challenge,
@ -62,7 +59,7 @@ func (h HandshakeOpen) EncodeMsgpack(enc *msgpack.Encoder) error {
return nil return nil
} }
func (h *HandshakeOpen) DecodeMessage(dec *msgpack.Decoder, message base.IncomingMessageData) error { func (h *HandshakeOpen) DecodeMessage(dec *msgpack.Decoder, message IncomingMessageData) error {
handshake, err := dec.DecodeBytes() handshake, err := dec.DecodeBytes()
if err != nil { if err != nil {
@ -93,7 +90,7 @@ func (h *HandshakeOpen) DecodeMessage(dec *msgpack.Decoder, message base.Incomin
return nil return nil
} }
func (h *HandshakeOpen) HandleMessage(message base.IncomingMessageData) error { func (h *HandshakeOpen) HandleMessage(message IncomingMessageData) error {
peer := message.Peer peer := message.Peer
services := message.Services services := message.Services

View File

@ -13,12 +13,12 @@ import (
) )
var _ base.EncodeableMessage = (*HashQuery)(nil) var _ base.EncodeableMessage = (*HashQuery)(nil)
var _ base.IncomingMessage = (*HashQuery)(nil) var _ IncomingMessage = (*HashQuery)(nil)
type HashQuery struct { type HashQuery struct {
hash *encoding.Multihash hash *encoding.Multihash
kinds []types.StorageLocationType kinds []types.StorageLocationType
base.HandshakeRequirement HandshakeRequirement
} }
func NewHashQuery() *HashQuery { func NewHashQuery() *HashQuery {
@ -47,7 +47,7 @@ func (h HashQuery) Kinds() []types.StorageLocationType {
return h.kinds return h.kinds
} }
func (h *HashQuery) DecodeMessage(dec *msgpack.Decoder, message base.IncomingMessageData) error { func (h *HashQuery) DecodeMessage(dec *msgpack.Decoder, message IncomingMessageData) error {
hash, err := dec.DecodeBytes() hash, err := dec.DecodeBytes()
if err != nil { if err != nil {
@ -88,7 +88,7 @@ func (h HashQuery) EncodeMsgpack(enc *msgpack.Encoder) error {
return nil return nil
} }
func (h *HashQuery) HandleMessage(message base.IncomingMessageData) error { func (h *HashQuery) HandleMessage(message IncomingMessageData) error {
peer := message.Peer peer := message.Peer
services := message.Services services := message.Services
logger := message.Logger logger := message.Logger

View File

@ -1,48 +1,47 @@
package protocol package protocol
import ( import (
"git.lumeweb.com/LumeWeb/libs5-go/protocol/base"
"git.lumeweb.com/LumeWeb/libs5-go/protocol/signed" "git.lumeweb.com/LumeWeb/libs5-go/protocol/signed"
"git.lumeweb.com/LumeWeb/libs5-go/types" "git.lumeweb.com/LumeWeb/libs5-go/types"
) )
var ( var (
messageTypes map[int]func() base.IncomingMessage messageTypes map[int]func() IncomingMessage
) )
func RegisterProtocols() { func RegisterProtocols() {
messageTypes = make(map[int]func() base.IncomingMessage) messageTypes = make(map[int]func() IncomingMessage)
// Register factory functions instead of instances // Register factory functions instead of instances
RegisterMessageType(int(types.ProtocolMethodHandshakeOpen), func() base.IncomingMessage { RegisterMessageType(int(types.ProtocolMethodHandshakeOpen), func() IncomingMessage {
return NewHandshakeOpen([]byte{}, "") return NewHandshakeOpen([]byte{}, "")
}) })
RegisterMessageType(int(types.ProtocolMethodHashQuery), func() base.IncomingMessage { RegisterMessageType(int(types.ProtocolMethodHashQuery), func() IncomingMessage {
return NewHashQuery() return NewHashQuery()
}) })
RegisterMessageType(int(types.RecordTypeStorageLocation), func() base.IncomingMessage { RegisterMessageType(int(types.RecordTypeStorageLocation), func() IncomingMessage {
return NewStorageLocation() return NewStorageLocation()
}) })
RegisterMessageType(int(types.RecordTypeRegistryEntry), func() base.IncomingMessage { RegisterMessageType(int(types.RecordTypeRegistryEntry), func() IncomingMessage {
return NewEmptyRegistryEntryRequest() return NewEmptyRegistryEntryRequest()
}) })
RegisterMessageType(int(types.ProtocolMethodRegistryQuery), func() base.IncomingMessage { RegisterMessageType(int(types.ProtocolMethodRegistryQuery), func() IncomingMessage {
return NewEmptyRegistryQuery() return NewEmptyRegistryQuery()
}) })
RegisterMessageType(int(types.ProtocolMethodSignedMessage), func() base.IncomingMessage { RegisterMessageType(int(types.ProtocolMethodSignedMessage), func() IncomingMessage {
return signed.NewSignedMessage() return signed.NewSignedMessage()
}) })
} }
func RegisterMessageType(messageType int, factoryFunc func() base.IncomingMessage) { func RegisterMessageType(messageType int, factoryFunc func() IncomingMessage) {
if factoryFunc == nil { if factoryFunc == nil {
panic("factoryFunc cannot be nil") panic("factoryFunc cannot be nil")
} }
messageTypes[messageType] = factoryFunc messageTypes[messageType] = factoryFunc
} }
func GetMessageType(kind int) (base.IncomingMessage, bool) { func GetMessageType(kind int) (IncomingMessage, bool) {
value, ok := messageTypes[kind] value, ok := messageTypes[kind]
if !ok { if !ok {
return nil, false return nil, false

View File

@ -1,7 +1,14 @@
package protocol package protocol
import ( import (
"context"
"crypto/rand" "crypto/rand"
"git.lumeweb.com/LumeWeb/libs5-go/config"
"git.lumeweb.com/LumeWeb/libs5-go/net"
"git.lumeweb.com/LumeWeb/libs5-go/service"
"github.com/vmihailenco/msgpack/v5"
"go.uber.org/zap"
"io"
"math" "math"
) )
@ -26,3 +33,68 @@ func CalculateNodeScore(goodResponses, badResponses int) float64 {
return score return score
} }
var (
_ msgpack.CustomDecoder = (*IncomingMessageReader)(nil)
)
type IncomingMessage interface {
HandleMessage(message IncomingMessageData) error
DecodeMessage(dec *msgpack.Decoder, message IncomingMessageData) error
HandshakeRequirer
}
type EncodeableMessage interface {
msgpack.CustomEncoder
}
type IncomingMessageData struct {
Original []byte
Data []byte
Ctx context.Context
Services service.Services
Logger *zap.Logger
Peer net.Peer
Config *config.NodeConfig
VerifyId bool
}
type IncomingMessageReader struct {
Kind int
Data []byte
}
func (i *IncomingMessageReader) DecodeMsgpack(dec *msgpack.Decoder) error {
kind, err := dec.DecodeInt()
if err != nil {
return err
}
i.Kind = kind
raw, err := io.ReadAll(dec.Buffered())
if err != nil {
return err
}
i.Data = raw
return nil
}
type HandshakeRequirer interface {
RequiresHandshake() bool
SetRequiresHandshake(value bool)
}
type HandshakeRequirement struct {
requiresHandshake bool
}
func (hr *HandshakeRequirement) RequiresHandshake() bool {
return hr.requiresHandshake
}
func (hr *HandshakeRequirement) SetRequiresHandshake(value bool) {
hr.requiresHandshake = value
}

View File

@ -1,17 +1,16 @@
package protocol package protocol
import ( import (
"git.lumeweb.com/LumeWeb/libs5-go/protocol/base"
"git.lumeweb.com/LumeWeb/libs5-go/types" "git.lumeweb.com/LumeWeb/libs5-go/types"
"github.com/vmihailenco/msgpack/v5" "github.com/vmihailenco/msgpack/v5"
) )
var _ base.IncomingMessage = (*RegistryEntryRequest)(nil) var _ IncomingMessage = (*RegistryEntryRequest)(nil)
var _ base.EncodeableMessage = (*RegistryEntryRequest)(nil) var _ EncodeableMessage = (*RegistryEntryRequest)(nil)
type RegistryEntryRequest struct { type RegistryEntryRequest struct {
sre SignedRegistryEntry sre SignedRegistryEntry
base.HandshakeRequirement HandshakeRequirement
} }
func NewEmptyRegistryEntryRequest() *RegistryEntryRequest { func NewEmptyRegistryEntryRequest() *RegistryEntryRequest {
@ -39,7 +38,7 @@ func (s *RegistryEntryRequest) EncodeMsgpack(enc *msgpack.Encoder) error {
return nil return nil
} }
func (s *RegistryEntryRequest) DecodeMessage(dec *msgpack.Decoder, message base.IncomingMessageData) error { func (s *RegistryEntryRequest) DecodeMessage(dec *msgpack.Decoder, message IncomingMessageData) error {
sre, err := UnmarshalSignedRegistryEntry(message.Data) sre, err := UnmarshalSignedRegistryEntry(message.Data)
if err != nil { if err != nil {
return err return err
@ -50,7 +49,7 @@ func (s *RegistryEntryRequest) DecodeMessage(dec *msgpack.Decoder, message base.
return nil return nil
} }
func (s *RegistryEntryRequest) HandleMessage(message base.IncomingMessageData) error { func (s *RegistryEntryRequest) HandleMessage(message IncomingMessageData) error {
peer := message.Peer peer := message.Peer
services := message.Services services := message.Services
return services.Registry().Set(s.sre, false, peer) return services.Registry().Set(s.sre, false, peer)

View File

@ -1,17 +1,16 @@
package protocol package protocol
import ( import (
"git.lumeweb.com/LumeWeb/libs5-go/protocol/base"
"git.lumeweb.com/LumeWeb/libs5-go/types" "git.lumeweb.com/LumeWeb/libs5-go/types"
"github.com/vmihailenco/msgpack/v5" "github.com/vmihailenco/msgpack/v5"
) )
var _ base.IncomingMessage = (*RegistryQuery)(nil) var _ IncomingMessage = (*RegistryQuery)(nil)
var _ base.EncodeableMessage = (*RegistryQuery)(nil) var _ EncodeableMessage = (*RegistryQuery)(nil)
type RegistryQuery struct { type RegistryQuery struct {
pk []byte pk []byte
base.HandshakeRequirement HandshakeRequirement
} }
func NewEmptyRegistryQuery() *RegistryQuery { func NewEmptyRegistryQuery() *RegistryQuery {
@ -39,7 +38,7 @@ func (s *RegistryQuery) EncodeMsgpack(enc *msgpack.Encoder) error {
return nil return nil
} }
func (s *RegistryQuery) DecodeMessage(dec *msgpack.Decoder, message base.IncomingMessageData) error { func (s *RegistryQuery) DecodeMessage(dec *msgpack.Decoder, message IncomingMessageData) error {
pk, err := dec.DecodeBytes() pk, err := dec.DecodeBytes()
if err != nil { if err != nil {
return err return err
@ -50,7 +49,7 @@ func (s *RegistryQuery) DecodeMessage(dec *msgpack.Decoder, message base.Incomin
return nil return nil
} }
func (s *RegistryQuery) HandleMessage(message base.IncomingMessageData) error { func (s *RegistryQuery) HandleMessage(message IncomingMessageData) error {
services := message.Services services := message.Services
peer := message.Peer peer := message.Peer
sre, err := services.Registry().Get(s.pk) sre, err := services.Registry().Get(s.pk)

View File

@ -3,7 +3,7 @@ package signed
import ( import (
"git.lumeweb.com/LumeWeb/libs5-go/encoding" "git.lumeweb.com/LumeWeb/libs5-go/encoding"
"git.lumeweb.com/LumeWeb/libs5-go/net" "git.lumeweb.com/LumeWeb/libs5-go/net"
"git.lumeweb.com/LumeWeb/libs5-go/protocol/base" "git.lumeweb.com/LumeWeb/libs5-go/protocol"
"git.lumeweb.com/LumeWeb/libs5-go/types" "git.lumeweb.com/LumeWeb/libs5-go/types"
"github.com/vmihailenco/msgpack/v5" "github.com/vmihailenco/msgpack/v5"
"net/url" "net/url"
@ -17,7 +17,7 @@ type AnnouncePeers struct {
peer net.Peer peer net.Peer
connectionUris []*url.URL connectionUris []*url.URL
peersToSend []net.Peer peersToSend []net.Peer
base.HandshakeRequirement protocol.HandshakeRequirement
} }
func (a *AnnouncePeers) PeersToSend() []net.Peer { func (a *AnnouncePeers) PeersToSend() []net.Peer {

View File

@ -5,6 +5,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"git.lumeweb.com/LumeWeb/libs5-go/net" "git.lumeweb.com/LumeWeb/libs5-go/net"
"git.lumeweb.com/LumeWeb/libs5-go/protocol"
"git.lumeweb.com/LumeWeb/libs5-go/protocol/base" "git.lumeweb.com/LumeWeb/libs5-go/protocol/base"
"git.lumeweb.com/LumeWeb/libs5-go/types" "git.lumeweb.com/LumeWeb/libs5-go/types"
"git.lumeweb.com/LumeWeb/libs5-go/utils" "git.lumeweb.com/LumeWeb/libs5-go/utils"
@ -21,7 +22,7 @@ type HandshakeDone struct {
supportedFeatures int supportedFeatures int
connectionUris []*url.URL connectionUris []*url.URL
handshake []byte handshake []byte
base.HandshakeRequirement protocol.HandshakeRequirement
} }
func NewHandshakeDoneRequest(handshake []byte, supportedFeatures int, connectionUris []*url.URL) *HandshakeDone { func NewHandshakeDoneRequest(handshake []byte, supportedFeatures int, connectionUris []*url.URL) *HandshakeDone {

View File

@ -2,20 +2,20 @@ package signed
import ( import (
"git.lumeweb.com/LumeWeb/libs5-go/encoding" "git.lumeweb.com/LumeWeb/libs5-go/encoding"
"git.lumeweb.com/LumeWeb/libs5-go/protocol/base" "git.lumeweb.com/LumeWeb/libs5-go/protocol"
"git.lumeweb.com/LumeWeb/libs5-go/types" "git.lumeweb.com/LumeWeb/libs5-go/types"
"github.com/vmihailenco/msgpack/v5" "github.com/vmihailenco/msgpack/v5"
) )
type IncomingMessageDataSigned struct { type IncomingMessageDataSigned struct {
base.IncomingMessageData protocol.IncomingMessageData
NodeId *encoding.NodeId NodeId *encoding.NodeId
} }
type IncomingMessageSigned interface { type IncomingMessageSigned interface {
HandleMessage(message IncomingMessageDataSigned) error HandleMessage(message IncomingMessageDataSigned) error
DecodeMessage(dec *msgpack.Decoder, message IncomingMessageDataSigned) error DecodeMessage(dec *msgpack.Decoder, message IncomingMessageDataSigned) error
base.HandshakeRequirer protocol.HandshakeRequirer
} }
var ( var (

View File

@ -5,7 +5,7 @@ import (
"errors" "errors"
"git.lumeweb.com/LumeWeb/libs5-go/config" "git.lumeweb.com/LumeWeb/libs5-go/config"
"git.lumeweb.com/LumeWeb/libs5-go/encoding" "git.lumeweb.com/LumeWeb/libs5-go/encoding"
"git.lumeweb.com/LumeWeb/libs5-go/protocol/base" "git.lumeweb.com/LumeWeb/libs5-go/protocol"
"git.lumeweb.com/LumeWeb/libs5-go/types" "git.lumeweb.com/LumeWeb/libs5-go/types"
"github.com/vmihailenco/msgpack/v5" "github.com/vmihailenco/msgpack/v5"
"go.uber.org/zap" "go.uber.org/zap"
@ -13,7 +13,7 @@ import (
) )
var ( var (
_ base.IncomingMessage = (*SignedMessage)(nil) _ protocol.IncomingMessage = (*SignedMessage)(nil)
_ msgpack.CustomDecoder = (*signedMessageReader)(nil) _ msgpack.CustomDecoder = (*signedMessageReader)(nil)
_ msgpack.CustomEncoder = (*SignedMessage)(nil) _ msgpack.CustomEncoder = (*SignedMessage)(nil)
) )
@ -26,7 +26,7 @@ type SignedMessage struct {
nodeId *encoding.NodeId nodeId *encoding.NodeId
signature []byte signature []byte
message []byte message []byte
base.HandshakeRequirement protocol.HandshakeRequirement
} }
func (s *SignedMessage) NodeId() *encoding.NodeId { func (s *SignedMessage) NodeId() *encoding.NodeId {
@ -81,7 +81,7 @@ func NewSignedMessage() *SignedMessage {
return sm return sm
} }
func (s *SignedMessage) HandleMessage(message base.IncomingMessageData) error { func (s *SignedMessage) HandleMessage(message protocol.IncomingMessageData) error {
var payload signedMessageReader var payload signedMessageReader
peer := message.Peer peer := message.Peer
logger := message.Logger logger := message.Logger
@ -116,7 +116,7 @@ func (s *SignedMessage) HandleMessage(message base.IncomingMessageData) error {
return nil return nil
} }
func (s *SignedMessage) DecodeMessage(dec *msgpack.Decoder, message base.IncomingMessageData) error { func (s *SignedMessage) DecodeMessage(dec *msgpack.Decoder, message protocol.IncomingMessageData) error {
nodeId, err := dec.DecodeBytes() nodeId, err := dec.DecodeBytes()
if err != nil { if err != nil {
return err return err

View File

@ -5,7 +5,6 @@ import (
"fmt" "fmt"
"git.lumeweb.com/LumeWeb/libs5-go/encoding" "git.lumeweb.com/LumeWeb/libs5-go/encoding"
"git.lumeweb.com/LumeWeb/libs5-go/net" "git.lumeweb.com/LumeWeb/libs5-go/net"
"git.lumeweb.com/LumeWeb/libs5-go/protocol/base"
"git.lumeweb.com/LumeWeb/libs5-go/storage" "git.lumeweb.com/LumeWeb/libs5-go/storage"
"git.lumeweb.com/LumeWeb/libs5-go/types" "git.lumeweb.com/LumeWeb/libs5-go/types"
"git.lumeweb.com/LumeWeb/libs5-go/utils" "git.lumeweb.com/LumeWeb/libs5-go/utils"
@ -14,7 +13,7 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
var _ base.IncomingMessage = (*StorageLocation)(nil) var _ IncomingMessage = (*StorageLocation)(nil)
type StorageLocation struct { type StorageLocation struct {
hash *encoding.Multihash hash *encoding.Multihash
@ -23,7 +22,7 @@ type StorageLocation struct {
parts []string parts []string
publicKey []byte publicKey []byte
signature []byte signature []byte
base.HandshakeRequirement HandshakeRequirement
} }
func NewStorageLocation() *StorageLocation { func NewStorageLocation() *StorageLocation {
@ -34,11 +33,11 @@ func NewStorageLocation() *StorageLocation {
return sl return sl
} }
func (s *StorageLocation) DecodeMessage(dec *msgpack.Decoder, message base.IncomingMessageData) error { func (s *StorageLocation) DecodeMessage(dec *msgpack.Decoder, message IncomingMessageData) error {
// nop, we use the incoming message -> original already stored // nop, we use the incoming message -> original already stored
return nil return nil
} }
func (s *StorageLocation) HandleMessage(message base.IncomingMessageData) error { func (s *StorageLocation) HandleMessage(message IncomingMessageData) error {
msg := message.Original msg := message.Original
services := message.Services services := message.Services
peer := message.Peer peer := message.Peer

View File

@ -9,7 +9,6 @@ import (
"git.lumeweb.com/LumeWeb/libs5-go/encoding" "git.lumeweb.com/LumeWeb/libs5-go/encoding"
"git.lumeweb.com/LumeWeb/libs5-go/net" "git.lumeweb.com/LumeWeb/libs5-go/net"
"git.lumeweb.com/LumeWeb/libs5-go/protocol" "git.lumeweb.com/LumeWeb/libs5-go/protocol"
"git.lumeweb.com/LumeWeb/libs5-go/protocol/base"
"git.lumeweb.com/LumeWeb/libs5-go/protocol/signed" "git.lumeweb.com/LumeWeb/libs5-go/protocol/signed"
"git.lumeweb.com/LumeWeb/libs5-go/service" "git.lumeweb.com/LumeWeb/libs5-go/service"
"git.lumeweb.com/LumeWeb/libs5-go/structs" "git.lumeweb.com/LumeWeb/libs5-go/structs"
@ -419,7 +418,7 @@ func (p *P2PServiceDefault) OnNewPeerListen(peer net.Peer, verifyId bool) {
}) })
peer.ListenForMessages(func(message []byte) error { peer.ListenForMessages(func(message []byte) error {
var reader base.IncomingMessageReader var reader protocol.IncomingMessageReader
err := msgpack.Unmarshal(message, &reader) err := msgpack.Unmarshal(message, &reader)
if err != nil { if err != nil {
@ -434,7 +433,7 @@ func (p *P2PServiceDefault) OnNewPeerListen(peer net.Peer, verifyId bool) {
return fmt.Errorf("unknown message type: %d", reader.Kind) return fmt.Errorf("unknown message type: %d", reader.Kind)
} }
data := base.IncomingMessageData{ data := protocol.IncomingMessageData{
Original: message, Original: message,
Data: reader.Data, Data: reader.Data,
Ctx: context.Background(), Ctx: context.Background(),