feat: initial registry service support

This commit is contained in:
Derrick Hammer 2024-01-10 06:21:03 -05:00
parent 528e1a6c27
commit 6bf557346d
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
13 changed files with 603 additions and 4 deletions

1
go.mod
View File

@ -8,6 +8,7 @@ require (
github.com/golang/mock v1.6.0
github.com/google/go-cmp v0.6.0
github.com/multiformats/go-multibase v0.2.0
github.com/olebedev/emitter v0.0.0-20230411050614-349169dec2ba
github.com/stretchr/testify v1.8.1
github.com/vmihailenco/msgpack/v5 v5.4.1
go.etcd.io/bbolt v1.3.8

View File

@ -13,9 +13,6 @@ import (
type P2PService interface {
Node() Node
Peers() structs.Map
Start() error
Stop() error
Init() error
ConnectToNode(connectionUris []*url.URL, retried bool) error
OnNewPeer(peer net.Peer, verifyId bool) error
OnNewPeerListen(peer net.Peer, verifyId bool)
@ -28,4 +25,5 @@ type P2PService interface {
UpVote(nodeId *encoding.NodeId) error
DownVote(nodeId *encoding.NodeId) error
NodeId() *encoding.NodeId
Service
}

27
interfaces/registry.go Normal file
View File

@ -0,0 +1,27 @@
package interfaces
import "git.lumeweb.com/LumeWeb/libs5-go/net"
type SignedRegistryEntry interface {
PK() []byte
Revision() uint64
Data() []byte
Signature() []byte
SetPK(pk []byte)
SetRevision(revision uint64)
SetData(data []byte)
SetSignature(signature []byte)
Verify() bool
}
type RegistryEntry interface {
Sign() SignedRegistryEntry
}
type RegistryService interface {
Set(sre SignedRegistryEntry, trusted bool, receivedFrom net.Peer) error
Get(pk []byte) (SignedRegistryEntry, error)
BroadcastEntry(sre SignedRegistryEntry, receivedFrom net.Peer) error
SendRegistryRequest(pk []byte) error
Service
}

View File

@ -10,4 +10,5 @@ type Service interface {
}
type Services interface {
P2P() P2PService
Registry() RegistryService
}

View File

@ -103,6 +103,15 @@ func (n *NodeImpl) Start() error {
if err != nil {
return err
}
err = n.Services().Registry().Init()
if err != nil {
return err
}
err = n.Services().Registry().Start()
if err != nil {
return err
}
n.started = true
return nil

View File

@ -7,7 +7,12 @@ var (
)
type ServicesImpl struct {
p2p interfaces.P2PService
p2p interfaces.P2PService
registry interfaces.RegistryService
}
func (s *ServicesImpl) Registry() interfaces.RegistryService {
return s.registry
}
func NewServices(p2p interfaces.P2PService) *ServicesImpl {

View File

@ -27,6 +27,12 @@ func Init() {
RegisterMessageType(int(types.RecordTypeStorageLocation), func() base.IncomingMessage {
return NewStorageLocation()
})
RegisterMessageType(int(types.RecordTypeRegistryEntry), func() base.IncomingMessage {
return NewEmptyRegistryEntryRequest()
})
RegisterMessageType(int(types.ProtocolMethodRegistryQuery), func() base.IncomingMessage {
return NewEmptyRegistryQuery()
})
RegisterMessageType(int(types.ProtocolMethodSignedMessage), func() base.IncomingMessage {
return signed.NewSignedMessage()
})

144
protocol/registry.go Normal file
View File

@ -0,0 +1,144 @@
package protocol
import (
ed25519p "crypto/ed25519"
"errors"
"git.lumeweb.com/LumeWeb/libs5-go/ed25519"
"git.lumeweb.com/LumeWeb/libs5-go/interfaces"
"git.lumeweb.com/LumeWeb/libs5-go/types"
"git.lumeweb.com/LumeWeb/libs5-go/utils"
)
var (
_ interfaces.SignedRegistryEntry = (*SignedRegistryEntryImpl)(nil)
_ interfaces.SignedRegistryEntry = (*SignedRegistryEntryImpl)(nil)
)
type SignedRegistryEntryImpl struct {
pk []byte
revision uint64
data []byte
signature []byte
}
func (s *SignedRegistryEntryImpl) Verify() bool {
return VerifyRegistryEntry(s)
}
func (s *SignedRegistryEntryImpl) PK() []byte {
return s.pk
}
func (s *SignedRegistryEntryImpl) SetPK(pk []byte) {
s.pk = pk
}
func (s *SignedRegistryEntryImpl) Revision() uint64 {
return s.revision
}
func (s *SignedRegistryEntryImpl) SetRevision(revision uint64) {
s.revision = revision
}
func (s *SignedRegistryEntryImpl) Data() []byte {
return s.data
}
func (s *SignedRegistryEntryImpl) SetData(data []byte) {
s.data = data
}
func (s *SignedRegistryEntryImpl) Signature() []byte {
return s.signature
}
func (s *SignedRegistryEntryImpl) SetSignature(signature []byte) {
s.signature = signature
}
func NewSignedRegistryEntry(pk []byte, revision uint64, data []byte, signature []byte) interfaces.SignedRegistryEntry {
return &SignedRegistryEntryImpl{
pk: pk,
revision: revision,
data: data,
signature: signature,
}
}
type RegistryEntryImpl struct {
kp ed25519.KeyPairEd25519
data []byte
revision uint64
}
func NewRegistryEntry(kp ed25519.KeyPairEd25519, data []byte, revision uint64) interfaces.RegistryEntry {
return &RegistryEntryImpl{
kp: kp,
data: data,
revision: revision,
}
}
func (r *RegistryEntryImpl) Sign() interfaces.SignedRegistryEntry {
return SignRegistryEntry(r.kp, r.data, r.revision)
}
func SignRegistryEntry(kp ed25519.KeyPairEd25519, data []byte, revision uint64) interfaces.SignedRegistryEntry {
buffer := MarshalRegistryEntry(data, revision)
privateKey := kp.ExtractBytes()
signature := ed25519p.Sign(privateKey, buffer)
return NewSignedRegistryEntry(kp.PublicKey(), uint64(revision), data, signature)
}
func VerifyRegistryEntry(sre interfaces.SignedRegistryEntry) bool {
buffer := MarshalRegistryEntry(sre.Data(), sre.Revision())
publicKey := sre.PK()[1:]
return ed25519p.Verify(publicKey, buffer, sre.Signature())
}
func MarshalSignedRegistryEntry(sre interfaces.SignedRegistryEntry) []byte {
buffer := MarshalRegistryEntry(sre.Data(), sre.Revision())
buffer = append(buffer, sre.Signature()...)
return buffer
}
func MarshalRegistryEntry(data []byte, revision uint64) []byte {
var buffer []byte
buffer = append(buffer, byte(types.RecordTypeRegistryEntry))
revBytes := utils.EncodeEndian(uint32(revision), 8)
buffer = append(buffer, revBytes...)
buffer = append(buffer, byte(len(data)))
buffer = append(buffer, data...)
return buffer
}
func UnmarshalSignedRegistryEntry(event []byte) (sre interfaces.SignedRegistryEntry, err error) {
if len(event) < 43 {
return nil, errors.New("Invalid registry entry")
}
dataLength := int(event[42])
if len(event) < 43+dataLength {
return nil, errors.New("Invalid registry entry")
}
pk := event[1:34]
revisionBytes := event[34:42]
revision := utils.DecodeEndian(revisionBytes)
signatureStart := 43 + dataLength
var signature []byte
if signatureStart < len(event) {
signature = event[signatureStart:]
} else {
return nil, errors.New("Invalid signature")
}
return NewSignedRegistryEntry(pk, uint64(revision), event[43:signatureStart], signature), nil
}

View File

@ -0,0 +1,59 @@
package protocol
import (
"git.lumeweb.com/LumeWeb/libs5-go/interfaces"
"git.lumeweb.com/LumeWeb/libs5-go/net"
"git.lumeweb.com/LumeWeb/libs5-go/protocol/base"
"git.lumeweb.com/LumeWeb/libs5-go/types"
"github.com/vmihailenco/msgpack/v5"
)
var _ base.IncomingMessageTyped = (*RegistryEntryRequest)(nil)
var _ base.EncodeableMessage = (*RegistryEntryRequest)(nil)
type RegistryEntryRequest struct {
sre interfaces.SignedRegistryEntry
base.IncomingMessageTypedImpl
base.IncomingMessageHandler
}
func NewEmptyRegistryEntryRequest() *RegistryEntryRequest {
return &RegistryEntryRequest{}
}
func NewRegistryEntryRequest(sre interfaces.SignedRegistryEntry) *RegistryEntryRequest {
return &RegistryEntryRequest{sre: sre}
}
func (s *RegistryEntryRequest) EncodeMsgpack(enc *msgpack.Encoder) error {
err := enc.EncodeInt(int64(types.RecordTypeRegistryEntry))
if err != nil {
return err
}
err = enc.EncodeBytes(MarshalSignedRegistryEntry(s.sre))
if err != nil {
return err
}
return nil
}
func (s *RegistryEntryRequest) DecodeMessage(dec *msgpack.Decoder) error {
pk, err := dec.DecodeBytes()
if err != nil {
return err
}
sre, err := UnmarshalSignedRegistryEntry(pk)
if err != nil {
return err
}
s.sre = sre
return nil
}
func (s *RegistryEntryRequest) HandleMessage(node interfaces.Node, peer net.Peer, verifyId bool) error {
return node.Services().Registry().Set(s.sre, false, peer)
}

View File

@ -0,0 +1,66 @@
package protocol
import (
"git.lumeweb.com/LumeWeb/libs5-go/interfaces"
"git.lumeweb.com/LumeWeb/libs5-go/net"
"git.lumeweb.com/LumeWeb/libs5-go/protocol/base"
"git.lumeweb.com/LumeWeb/libs5-go/types"
"github.com/vmihailenco/msgpack/v5"
)
var _ base.IncomingMessageTyped = (*RegistryQuery)(nil)
var _ base.EncodeableMessage = (*RegistryQuery)(nil)
type RegistryQuery struct {
pk []byte
base.IncomingMessageTypedImpl
base.IncomingMessageHandler
}
func NewEmptyRegistryQuery() *RegistryQuery {
return &RegistryQuery{}
}
func NewRegistryQuery(pk []byte) *RegistryQuery {
return &RegistryQuery{pk: pk}
}
func (s *RegistryQuery) EncodeMsgpack(enc *msgpack.Encoder) error {
err := enc.EncodeInt(int64(types.ProtocolMethodRegistryQuery))
if err != nil {
return err
}
err = enc.EncodeBytes(s.pk)
if err != nil {
return err
}
return nil
}
func (s *RegistryQuery) DecodeMessage(dec *msgpack.Decoder) error {
pk, err := dec.DecodeBytes()
if err != nil {
return err
}
s.pk = pk
return nil
}
func (s *RegistryQuery) HandleMessage(node interfaces.Node, peer net.Peer, verifyId bool) error {
sre, err := node.Services().Registry().Get(s.pk)
if err != nil {
return err
}
if sre != nil {
err := peer.SendMessage(MarshalSignedRegistryEntry(sre))
if err != nil {
return err
}
}
return nil
}

279
service/registry.go Normal file
View File

@ -0,0 +1,279 @@
package service
import (
"errors"
"fmt"
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
"git.lumeweb.com/LumeWeb/libs5-go/interfaces"
"git.lumeweb.com/LumeWeb/libs5-go/net"
"git.lumeweb.com/LumeWeb/libs5-go/protocol"
"git.lumeweb.com/LumeWeb/libs5-go/structs"
"git.lumeweb.com/LumeWeb/libs5-go/types"
"git.lumeweb.com/LumeWeb/libs5-go/utils"
"github.com/olebedev/emitter"
"github.com/vmihailenco/msgpack/v5"
"go.etcd.io/bbolt"
"go.uber.org/zap"
"time"
)
var _ interfaces.RegistryService = (*RegistryImpl)(nil)
const registryBucketName = "registry"
type RegistryImpl struct {
node interfaces.Node
logger *zap.Logger
streams structs.Map
subs structs.Map
}
func (r *RegistryImpl) Node() interfaces.Node {
return r.node
}
func (r *RegistryImpl) Start() error {
return nil
}
func (r *RegistryImpl) Stop() error {
return nil
}
func (r *RegistryImpl) Init() error {
return utils.CreateBucket(registryBucketName, r.node.Db())
}
func NewRegistry(node interfaces.Node, logger *zap.Logger) *RegistryImpl {
return &RegistryImpl{
node: node,
logger: logger,
streams: structs.NewMap(),
subs: structs.NewMap(),
}
}
func (r *RegistryImpl) Set(sre interfaces.SignedRegistryEntry, trusted bool, receivedFrom net.Peer) error {
hash := encoding.NewMultihash(sre.PK())
hashString, err := hash.ToString()
if err != nil {
return err
}
pid, err := receivedFrom.Id().ToString()
if err != nil {
return err
}
r.logger.Debug("[registry] set", zap.String("pk", hashString), zap.Uint64("revision", sre.Revision()), zap.String("receivedFrom", pid))
if !trusted {
if len(sre.PK()) != 33 {
return errors.New("Invalid pubkey")
}
if int(sre.PK()[0]) != int(types.HashTypeEd25519) {
return errors.New("Only ed25519 keys are supported")
}
if sre.Revision() < 0 || sre.Revision() > 281474976710656 {
return errors.New("Invalid revision")
}
if len(sre.Data()) > types.RegistryMaxDataSize {
return errors.New("Data too long")
}
if !sre.Verify() {
return errors.New("Invalid signature found")
}
}
existingEntry, err := r.getFromDB(sre.PK())
if err != nil {
return err
}
if existingEntry != nil {
if receivedFrom != nil {
if existingEntry.Revision() == sre.Revision() {
return nil
} else if existingEntry.Revision() > sre.Revision() {
updateMessage := protocol.MarshalSignedRegistryEntry(existingEntry)
err := receivedFrom.SendMessage(updateMessage)
if err != nil {
return err
}
return nil
}
}
if existingEntry.Revision() >= sre.Revision() {
return errors.New("Revision number too low")
}
}
key := encoding.NewMultihash(sre.PK())
keyString, err := key.ToString()
if err != nil {
return err
}
eventObj, ok := r.streams.Get(keyString)
if ok {
event := eventObj.(*emitter.Emitter)
event.Emit("fire", sre)
}
err = r.node.Db().Update(func(txn *bbolt.Tx) error {
bucket := txn.Bucket([]byte(registryBucketName))
err := bucket.Put(sre.PK(), protocol.MarshalSignedRegistryEntry(sre))
if err != nil {
return err
}
return nil
})
if err != nil {
return err
}
err = r.BroadcastEntry(sre, receivedFrom)
if err != nil {
return err
}
return nil
}
func (r *RegistryImpl) BroadcastEntry(sre interfaces.SignedRegistryEntry, receivedFrom net.Peer) error {
hash := encoding.NewMultihash(sre.PK())
hashString, err := hash.ToString()
if err != nil {
return err
}
pid, err := receivedFrom.Id().ToString()
if err != nil {
return err
}
r.logger.Debug("[registry] broadcastEntry", zap.String("pk", hashString), zap.Uint64("revision", sre.Revision()), zap.String("receivedFrom", pid))
updateMessage := protocol.MarshalSignedRegistryEntry(sre)
for _, p := range r.node.Services().P2P().Peers().Values() {
peer, ok := p.(net.Peer)
if !ok {
continue
}
if receivedFrom == nil || peer.Id().Equals(receivedFrom.Id()) {
err := peer.SendMessage(updateMessage)
if err != nil {
pid, err := peer.Id().ToString()
if err != nil {
return err
}
r.logger.Error("Failed to send registry broadcast", zap.String("peer", pid), zap.Error(err))
return err
}
}
}
return nil
}
func (r *RegistryImpl) SendRegistryRequest(pk []byte) error {
query := protocol.NewRegistryQuery(pk)
request, err := msgpack.Marshal(query)
if err != nil {
return err
}
// Iterate over peers and send the request
for _, peerVal := range r.node.Services().P2P().Peers().Values() {
peer, ok := peerVal.(net.Peer)
if !ok {
continue
}
err := peer.SendMessage(request)
if err != nil {
pid, err := peer.Id().ToString()
if err != nil {
return err
}
r.logger.Error("Failed to send registry request", zap.String("peer", pid), zap.Error(err))
return err
}
}
return nil
}
func (r *RegistryImpl) Get(pk []byte) (interfaces.SignedRegistryEntry, error) {
key := encoding.NewMultihash(pk)
keyString, err := key.ToString()
if err != nil {
return nil, err
}
if r.subs.Contains(keyString) {
r.logger.Debug(fmt.Sprintf("[registry] get (cached) %s", key), zap.String("key", keyString))
res, err := r.getFromDB(pk)
if err != nil {
return nil, err
}
if res != nil {
return res, nil
}
err = r.SendRegistryRequest(pk)
if err != nil {
return nil, err
}
time.Sleep(200 * time.Millisecond)
return r.getFromDB(pk)
}
err = r.SendRegistryRequest(pk)
if err != nil {
return nil, err
}
r.subs.Put(keyString, key)
if !r.streams.Contains(keyString) {
event := &emitter.Emitter{}
r.streams.Put(keyString, event)
}
res, err := r.getFromDB(pk)
if err != nil {
return nil, err
}
if res == nil {
r.logger.Debug(fmt.Sprintf("[registry] get (cached) %s", key), zap.String("key", keyString))
for i := 0; i < 200; i++ {
time.Sleep(10 * time.Millisecond)
res, err := r.getFromDB(pk)
if err != nil {
return nil, err
}
if res != nil {
break
}
}
} else {
r.logger.Debug(fmt.Sprintf("[registry] get (cached) %s", key), zap.String("key", keyString))
time.Sleep(200 * time.Millisecond)
}
return r.getFromDB(pk)
}
func (r *RegistryImpl) getFromDB(pk []byte) (sre interfaces.SignedRegistryEntry, err error) {
err = r.node.Db().View(func(txn *bbolt.Tx) error {
bucket := txn.Bucket([]byte(registryBucketName))
val := bucket.Get(pk)
if val != nil {
entry, err := protocol.UnmarshalSignedRegistryEntry(val)
if err != nil {
return err
}
sre = entry
return nil
}
return nil
})
if err != nil {
return nil, err
}
return sre, nil
}

View File

@ -11,6 +11,7 @@ const (
ProtocolMethodRegistryQuery ProtocolMethod = 0xD
RecordTypeStorageLocation ProtocolMethod = 0x05
RecordTypeStreamEvent ProtocolMethod = 0x09
RecordTypeRegistryEntry ProtocolMethod = 0x07
)
var ProtocolMethodMap = map[ProtocolMethod]string{
@ -22,4 +23,5 @@ var ProtocolMethodMap = map[ProtocolMethod]string{
ProtocolMethodRegistryQuery: "RegistryQuery",
RecordTypeStorageLocation: "StorageLocation",
RecordTypeStreamEvent: "StreamEvent",
RecordTypeRegistryEntry: "RegistryEntry",
}

View File

@ -11,3 +11,5 @@ var RegistryTypeMap = map[RegistryType]string{
RegistryTypeCID: "CID",
RegistryTypeEncryptedCID: "EncryptedCID",
}
const RegistryMaxDataSize = 64