From 6bf557346d1ed0d9e0a8607c62f4d8d169888aac Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Wed, 10 Jan 2024 06:21:03 -0500 Subject: [PATCH] feat: initial registry service support --- go.mod | 1 + interfaces/p2p.go | 4 +- interfaces/registry.go | 27 ++++ interfaces/service.go | 1 + node/node.go | 9 ++ node/services.go | 7 +- protocol/message.go | 6 + protocol/registry.go | 144 +++++++++++++++++++ protocol/registry_entry.go | 59 ++++++++ protocol/registry_query.go | 66 +++++++++ service/registry.go | 279 +++++++++++++++++++++++++++++++++++++ types/protocol.go | 2 + types/registry.go | 2 + 13 files changed, 603 insertions(+), 4 deletions(-) create mode 100644 interfaces/registry.go create mode 100644 protocol/registry.go create mode 100644 protocol/registry_entry.go create mode 100644 protocol/registry_query.go create mode 100644 service/registry.go diff --git a/go.mod b/go.mod index 77ab2d2..85d2efd 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/golang/mock v1.6.0 github.com/google/go-cmp v0.6.0 github.com/multiformats/go-multibase v0.2.0 + github.com/olebedev/emitter v0.0.0-20230411050614-349169dec2ba github.com/stretchr/testify v1.8.1 github.com/vmihailenco/msgpack/v5 v5.4.1 go.etcd.io/bbolt v1.3.8 diff --git a/interfaces/p2p.go b/interfaces/p2p.go index f5b1063..8bc02b1 100644 --- a/interfaces/p2p.go +++ b/interfaces/p2p.go @@ -13,9 +13,6 @@ import ( type P2PService interface { Node() Node Peers() structs.Map - Start() error - Stop() error - Init() error ConnectToNode(connectionUris []*url.URL, retried bool) error OnNewPeer(peer net.Peer, verifyId bool) error OnNewPeerListen(peer net.Peer, verifyId bool) @@ -28,4 +25,5 @@ type P2PService interface { UpVote(nodeId *encoding.NodeId) error DownVote(nodeId *encoding.NodeId) error NodeId() *encoding.NodeId + Service } diff --git a/interfaces/registry.go b/interfaces/registry.go new file mode 100644 index 0000000..3366ceb --- /dev/null +++ b/interfaces/registry.go @@ -0,0 +1,27 @@ +package interfaces + +import "git.lumeweb.com/LumeWeb/libs5-go/net" + +type SignedRegistryEntry interface { + PK() []byte + Revision() uint64 + Data() []byte + Signature() []byte + SetPK(pk []byte) + SetRevision(revision uint64) + SetData(data []byte) + SetSignature(signature []byte) + Verify() bool +} + +type RegistryEntry interface { + Sign() SignedRegistryEntry +} + +type RegistryService interface { + Set(sre SignedRegistryEntry, trusted bool, receivedFrom net.Peer) error + Get(pk []byte) (SignedRegistryEntry, error) + BroadcastEntry(sre SignedRegistryEntry, receivedFrom net.Peer) error + SendRegistryRequest(pk []byte) error + Service +} diff --git a/interfaces/service.go b/interfaces/service.go index ba0e61e..8b24d96 100644 --- a/interfaces/service.go +++ b/interfaces/service.go @@ -10,4 +10,5 @@ type Service interface { } type Services interface { P2P() P2PService + Registry() RegistryService } diff --git a/node/node.go b/node/node.go index aa964fe..a38430c 100644 --- a/node/node.go +++ b/node/node.go @@ -103,6 +103,15 @@ func (n *NodeImpl) Start() error { if err != nil { return err } + err = n.Services().Registry().Init() + if err != nil { + return err + } + + err = n.Services().Registry().Start() + if err != nil { + return err + } n.started = true return nil diff --git a/node/services.go b/node/services.go index 207f5cc..484c107 100644 --- a/node/services.go +++ b/node/services.go @@ -7,7 +7,12 @@ var ( ) type ServicesImpl struct { - p2p interfaces.P2PService + p2p interfaces.P2PService + registry interfaces.RegistryService +} + +func (s *ServicesImpl) Registry() interfaces.RegistryService { + return s.registry } func NewServices(p2p interfaces.P2PService) *ServicesImpl { diff --git a/protocol/message.go b/protocol/message.go index 47564e1..902e22d 100644 --- a/protocol/message.go +++ b/protocol/message.go @@ -27,6 +27,12 @@ func Init() { RegisterMessageType(int(types.RecordTypeStorageLocation), func() base.IncomingMessage { return NewStorageLocation() }) + RegisterMessageType(int(types.RecordTypeRegistryEntry), func() base.IncomingMessage { + return NewEmptyRegistryEntryRequest() + }) + RegisterMessageType(int(types.ProtocolMethodRegistryQuery), func() base.IncomingMessage { + return NewEmptyRegistryQuery() + }) RegisterMessageType(int(types.ProtocolMethodSignedMessage), func() base.IncomingMessage { return signed.NewSignedMessage() }) diff --git a/protocol/registry.go b/protocol/registry.go new file mode 100644 index 0000000..7a60a63 --- /dev/null +++ b/protocol/registry.go @@ -0,0 +1,144 @@ +package protocol + +import ( + ed25519p "crypto/ed25519" + "errors" + "git.lumeweb.com/LumeWeb/libs5-go/ed25519" + "git.lumeweb.com/LumeWeb/libs5-go/interfaces" + "git.lumeweb.com/LumeWeb/libs5-go/types" + "git.lumeweb.com/LumeWeb/libs5-go/utils" +) + +var ( + _ interfaces.SignedRegistryEntry = (*SignedRegistryEntryImpl)(nil) + _ interfaces.SignedRegistryEntry = (*SignedRegistryEntryImpl)(nil) +) + +type SignedRegistryEntryImpl struct { + pk []byte + revision uint64 + data []byte + signature []byte +} + +func (s *SignedRegistryEntryImpl) Verify() bool { + return VerifyRegistryEntry(s) +} + +func (s *SignedRegistryEntryImpl) PK() []byte { + return s.pk +} + +func (s *SignedRegistryEntryImpl) SetPK(pk []byte) { + s.pk = pk +} + +func (s *SignedRegistryEntryImpl) Revision() uint64 { + return s.revision +} + +func (s *SignedRegistryEntryImpl) SetRevision(revision uint64) { + s.revision = revision +} + +func (s *SignedRegistryEntryImpl) Data() []byte { + return s.data +} + +func (s *SignedRegistryEntryImpl) SetData(data []byte) { + s.data = data +} + +func (s *SignedRegistryEntryImpl) Signature() []byte { + return s.signature +} + +func (s *SignedRegistryEntryImpl) SetSignature(signature []byte) { + s.signature = signature +} + +func NewSignedRegistryEntry(pk []byte, revision uint64, data []byte, signature []byte) interfaces.SignedRegistryEntry { + return &SignedRegistryEntryImpl{ + pk: pk, + revision: revision, + data: data, + signature: signature, + } +} + +type RegistryEntryImpl struct { + kp ed25519.KeyPairEd25519 + data []byte + revision uint64 +} + +func NewRegistryEntry(kp ed25519.KeyPairEd25519, data []byte, revision uint64) interfaces.RegistryEntry { + return &RegistryEntryImpl{ + kp: kp, + data: data, + revision: revision, + } +} + +func (r *RegistryEntryImpl) Sign() interfaces.SignedRegistryEntry { + return SignRegistryEntry(r.kp, r.data, r.revision) +} + +func SignRegistryEntry(kp ed25519.KeyPairEd25519, data []byte, revision uint64) interfaces.SignedRegistryEntry { + buffer := MarshalRegistryEntry(data, revision) + + privateKey := kp.ExtractBytes() + signature := ed25519p.Sign(privateKey, buffer) + + return NewSignedRegistryEntry(kp.PublicKey(), uint64(revision), data, signature) +} +func VerifyRegistryEntry(sre interfaces.SignedRegistryEntry) bool { + buffer := MarshalRegistryEntry(sre.Data(), sre.Revision()) + publicKey := sre.PK()[1:] + + return ed25519p.Verify(publicKey, buffer, sre.Signature()) +} + +func MarshalSignedRegistryEntry(sre interfaces.SignedRegistryEntry) []byte { + buffer := MarshalRegistryEntry(sre.Data(), sre.Revision()) + buffer = append(buffer, sre.Signature()...) + + return buffer +} +func MarshalRegistryEntry(data []byte, revision uint64) []byte { + var buffer []byte + buffer = append(buffer, byte(types.RecordTypeRegistryEntry)) + + revBytes := utils.EncodeEndian(uint32(revision), 8) + buffer = append(buffer, revBytes...) + + buffer = append(buffer, byte(len(data))) + buffer = append(buffer, data...) + + return buffer +} + +func UnmarshalSignedRegistryEntry(event []byte) (sre interfaces.SignedRegistryEntry, err error) { + if len(event) < 43 { + return nil, errors.New("Invalid registry entry") + } + + dataLength := int(event[42]) + if len(event) < 43+dataLength { + return nil, errors.New("Invalid registry entry") + } + + pk := event[1:34] + revisionBytes := event[34:42] + revision := utils.DecodeEndian(revisionBytes) + signatureStart := 43 + dataLength + var signature []byte + + if signatureStart < len(event) { + signature = event[signatureStart:] + } else { + return nil, errors.New("Invalid signature") + } + + return NewSignedRegistryEntry(pk, uint64(revision), event[43:signatureStart], signature), nil +} diff --git a/protocol/registry_entry.go b/protocol/registry_entry.go new file mode 100644 index 0000000..1eaa2e7 --- /dev/null +++ b/protocol/registry_entry.go @@ -0,0 +1,59 @@ +package protocol + +import ( + "git.lumeweb.com/LumeWeb/libs5-go/interfaces" + "git.lumeweb.com/LumeWeb/libs5-go/net" + "git.lumeweb.com/LumeWeb/libs5-go/protocol/base" + "git.lumeweb.com/LumeWeb/libs5-go/types" + "github.com/vmihailenco/msgpack/v5" +) + +var _ base.IncomingMessageTyped = (*RegistryEntryRequest)(nil) +var _ base.EncodeableMessage = (*RegistryEntryRequest)(nil) + +type RegistryEntryRequest struct { + sre interfaces.SignedRegistryEntry + base.IncomingMessageTypedImpl + base.IncomingMessageHandler +} + +func NewEmptyRegistryEntryRequest() *RegistryEntryRequest { + return &RegistryEntryRequest{} +} +func NewRegistryEntryRequest(sre interfaces.SignedRegistryEntry) *RegistryEntryRequest { + return &RegistryEntryRequest{sre: sre} +} + +func (s *RegistryEntryRequest) EncodeMsgpack(enc *msgpack.Encoder) error { + err := enc.EncodeInt(int64(types.RecordTypeRegistryEntry)) + if err != nil { + return err + } + + err = enc.EncodeBytes(MarshalSignedRegistryEntry(s.sre)) + if err != nil { + return err + } + + return nil +} + +func (s *RegistryEntryRequest) DecodeMessage(dec *msgpack.Decoder) error { + pk, err := dec.DecodeBytes() + if err != nil { + return err + } + + sre, err := UnmarshalSignedRegistryEntry(pk) + if err != nil { + return err + } + + s.sre = sre + + return nil +} + +func (s *RegistryEntryRequest) HandleMessage(node interfaces.Node, peer net.Peer, verifyId bool) error { + return node.Services().Registry().Set(s.sre, false, peer) +} diff --git a/protocol/registry_query.go b/protocol/registry_query.go new file mode 100644 index 0000000..fa8a83b --- /dev/null +++ b/protocol/registry_query.go @@ -0,0 +1,66 @@ +package protocol + +import ( + "git.lumeweb.com/LumeWeb/libs5-go/interfaces" + "git.lumeweb.com/LumeWeb/libs5-go/net" + "git.lumeweb.com/LumeWeb/libs5-go/protocol/base" + "git.lumeweb.com/LumeWeb/libs5-go/types" + "github.com/vmihailenco/msgpack/v5" +) + +var _ base.IncomingMessageTyped = (*RegistryQuery)(nil) +var _ base.EncodeableMessage = (*RegistryQuery)(nil) + +type RegistryQuery struct { + pk []byte + base.IncomingMessageTypedImpl + base.IncomingMessageHandler +} + +func NewEmptyRegistryQuery() *RegistryQuery { + return &RegistryQuery{} +} +func NewRegistryQuery(pk []byte) *RegistryQuery { + return &RegistryQuery{pk: pk} +} + +func (s *RegistryQuery) EncodeMsgpack(enc *msgpack.Encoder) error { + err := enc.EncodeInt(int64(types.ProtocolMethodRegistryQuery)) + if err != nil { + return err + } + + err = enc.EncodeBytes(s.pk) + if err != nil { + return err + } + + return nil +} + +func (s *RegistryQuery) DecodeMessage(dec *msgpack.Decoder) error { + pk, err := dec.DecodeBytes() + if err != nil { + return err + } + + s.pk = pk + + return nil +} + +func (s *RegistryQuery) HandleMessage(node interfaces.Node, peer net.Peer, verifyId bool) error { + sre, err := node.Services().Registry().Get(s.pk) + if err != nil { + return err + } + + if sre != nil { + err := peer.SendMessage(MarshalSignedRegistryEntry(sre)) + if err != nil { + return err + } + } + + return nil +} diff --git a/service/registry.go b/service/registry.go new file mode 100644 index 0000000..707123e --- /dev/null +++ b/service/registry.go @@ -0,0 +1,279 @@ +package service + +import ( + "errors" + "fmt" + "git.lumeweb.com/LumeWeb/libs5-go/encoding" + "git.lumeweb.com/LumeWeb/libs5-go/interfaces" + "git.lumeweb.com/LumeWeb/libs5-go/net" + "git.lumeweb.com/LumeWeb/libs5-go/protocol" + "git.lumeweb.com/LumeWeb/libs5-go/structs" + "git.lumeweb.com/LumeWeb/libs5-go/types" + "git.lumeweb.com/LumeWeb/libs5-go/utils" + "github.com/olebedev/emitter" + "github.com/vmihailenco/msgpack/v5" + "go.etcd.io/bbolt" + "go.uber.org/zap" + "time" +) + +var _ interfaces.RegistryService = (*RegistryImpl)(nil) + +const registryBucketName = "registry" + +type RegistryImpl struct { + node interfaces.Node + logger *zap.Logger + streams structs.Map + subs structs.Map +} + +func (r *RegistryImpl) Node() interfaces.Node { + return r.node +} + +func (r *RegistryImpl) Start() error { + return nil +} + +func (r *RegistryImpl) Stop() error { + return nil +} + +func (r *RegistryImpl) Init() error { + return utils.CreateBucket(registryBucketName, r.node.Db()) +} + +func NewRegistry(node interfaces.Node, logger *zap.Logger) *RegistryImpl { + return &RegistryImpl{ + node: node, + logger: logger, + streams: structs.NewMap(), + subs: structs.NewMap(), + } +} +func (r *RegistryImpl) Set(sre interfaces.SignedRegistryEntry, trusted bool, receivedFrom net.Peer) error { + hash := encoding.NewMultihash(sre.PK()) + hashString, err := hash.ToString() + if err != nil { + return err + } + pid, err := receivedFrom.Id().ToString() + if err != nil { + return err + } + r.logger.Debug("[registry] set", zap.String("pk", hashString), zap.Uint64("revision", sre.Revision()), zap.String("receivedFrom", pid)) + + if !trusted { + if len(sre.PK()) != 33 { + return errors.New("Invalid pubkey") + } + if int(sre.PK()[0]) != int(types.HashTypeEd25519) { + return errors.New("Only ed25519 keys are supported") + } + if sre.Revision() < 0 || sre.Revision() > 281474976710656 { + return errors.New("Invalid revision") + } + if len(sre.Data()) > types.RegistryMaxDataSize { + return errors.New("Data too long") + } + + if !sre.Verify() { + return errors.New("Invalid signature found") + } + } + + existingEntry, err := r.getFromDB(sre.PK()) + if err != nil { + return err + } + + if existingEntry != nil { + if receivedFrom != nil { + if existingEntry.Revision() == sre.Revision() { + return nil + } else if existingEntry.Revision() > sre.Revision() { + updateMessage := protocol.MarshalSignedRegistryEntry(existingEntry) + err := receivedFrom.SendMessage(updateMessage) + if err != nil { + return err + } + return nil + } + } + + if existingEntry.Revision() >= sre.Revision() { + return errors.New("Revision number too low") + } + } + + key := encoding.NewMultihash(sre.PK()) + keyString, err := key.ToString() + if err != nil { + return err + } + + eventObj, ok := r.streams.Get(keyString) + if ok { + event := eventObj.(*emitter.Emitter) + event.Emit("fire", sre) + } + + err = r.node.Db().Update(func(txn *bbolt.Tx) error { + bucket := txn.Bucket([]byte(registryBucketName)) + err := bucket.Put(sre.PK(), protocol.MarshalSignedRegistryEntry(sre)) + if err != nil { + return err + } + return nil + }) + + if err != nil { + return err + } + + err = r.BroadcastEntry(sre, receivedFrom) + if err != nil { + return err + } + + return nil +} +func (r *RegistryImpl) BroadcastEntry(sre interfaces.SignedRegistryEntry, receivedFrom net.Peer) error { + hash := encoding.NewMultihash(sre.PK()) + hashString, err := hash.ToString() + if err != nil { + return err + } + pid, err := receivedFrom.Id().ToString() + if err != nil { + return err + } + r.logger.Debug("[registry] broadcastEntry", zap.String("pk", hashString), zap.Uint64("revision", sre.Revision()), zap.String("receivedFrom", pid)) + updateMessage := protocol.MarshalSignedRegistryEntry(sre) + + for _, p := range r.node.Services().P2P().Peers().Values() { + peer, ok := p.(net.Peer) + if !ok { + continue + } + if receivedFrom == nil || peer.Id().Equals(receivedFrom.Id()) { + err := peer.SendMessage(updateMessage) + if err != nil { + pid, err := peer.Id().ToString() + if err != nil { + return err + } + r.logger.Error("Failed to send registry broadcast", zap.String("peer", pid), zap.Error(err)) + return err + } + } + } + + return nil +} +func (r *RegistryImpl) SendRegistryRequest(pk []byte) error { + query := protocol.NewRegistryQuery(pk) + + request, err := msgpack.Marshal(query) + if err != nil { + return err + } + + // Iterate over peers and send the request + for _, peerVal := range r.node.Services().P2P().Peers().Values() { + peer, ok := peerVal.(net.Peer) + if !ok { + continue + } + err := peer.SendMessage(request) + if err != nil { + pid, err := peer.Id().ToString() + if err != nil { + return err + } + r.logger.Error("Failed to send registry request", zap.String("peer", pid), zap.Error(err)) + return err + } + } + + return nil +} +func (r *RegistryImpl) Get(pk []byte) (interfaces.SignedRegistryEntry, error) { + key := encoding.NewMultihash(pk) + keyString, err := key.ToString() + if err != nil { + return nil, err + } + + if r.subs.Contains(keyString) { + r.logger.Debug(fmt.Sprintf("[registry] get (cached) %s", key), zap.String("key", keyString)) + res, err := r.getFromDB(pk) + if err != nil { + return nil, err + } + if res != nil { + return res, nil + } + + err = r.SendRegistryRequest(pk) + if err != nil { + return nil, err + } + time.Sleep(200 * time.Millisecond) + return r.getFromDB(pk) + } + + err = r.SendRegistryRequest(pk) + if err != nil { + return nil, err + } + r.subs.Put(keyString, key) + if !r.streams.Contains(keyString) { + event := &emitter.Emitter{} + r.streams.Put(keyString, event) + } + + res, err := r.getFromDB(pk) + if err != nil { + return nil, err + } + if res == nil { + r.logger.Debug(fmt.Sprintf("[registry] get (cached) %s", key), zap.String("key", keyString)) + for i := 0; i < 200; i++ { + time.Sleep(10 * time.Millisecond) + res, err := r.getFromDB(pk) + if err != nil { + return nil, err + } + if res != nil { + break + } + } + } else { + r.logger.Debug(fmt.Sprintf("[registry] get (cached) %s", key), zap.String("key", keyString)) + time.Sleep(200 * time.Millisecond) + } + return r.getFromDB(pk) +} + +func (r *RegistryImpl) getFromDB(pk []byte) (sre interfaces.SignedRegistryEntry, err error) { + err = r.node.Db().View(func(txn *bbolt.Tx) error { + bucket := txn.Bucket([]byte(registryBucketName)) + val := bucket.Get(pk) + if val != nil { + entry, err := protocol.UnmarshalSignedRegistryEntry(val) + if err != nil { + return err + } + sre = entry + return nil + } + return nil + }) + if err != nil { + return nil, err + } + + return sre, nil +} diff --git a/types/protocol.go b/types/protocol.go index 4b5809a..5b1a6f9 100644 --- a/types/protocol.go +++ b/types/protocol.go @@ -11,6 +11,7 @@ const ( ProtocolMethodRegistryQuery ProtocolMethod = 0xD RecordTypeStorageLocation ProtocolMethod = 0x05 RecordTypeStreamEvent ProtocolMethod = 0x09 + RecordTypeRegistryEntry ProtocolMethod = 0x07 ) var ProtocolMethodMap = map[ProtocolMethod]string{ @@ -22,4 +23,5 @@ var ProtocolMethodMap = map[ProtocolMethod]string{ ProtocolMethodRegistryQuery: "RegistryQuery", RecordTypeStorageLocation: "StorageLocation", RecordTypeStreamEvent: "StreamEvent", + RecordTypeRegistryEntry: "RegistryEntry", } diff --git a/types/registry.go b/types/registry.go index 5be366e..56364ea 100644 --- a/types/registry.go +++ b/types/registry.go @@ -11,3 +11,5 @@ var RegistryTypeMap = map[RegistryType]string{ RegistryTypeCID: "CID", RegistryTypeEncryptedCID: "EncryptedCID", } + +const RegistryMaxDataSize = 64