libs5-go/service/default/registry.go

311 lines
7.1 KiB
Go
Raw Normal View History

package _default
import (
2024-02-27 08:28:25 +00:00
"context"
"errors"
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
"git.lumeweb.com/LumeWeb/libs5-go/net"
"git.lumeweb.com/LumeWeb/libs5-go/protocol"
"git.lumeweb.com/LumeWeb/libs5-go/service"
"git.lumeweb.com/LumeWeb/libs5-go/structs"
"git.lumeweb.com/LumeWeb/libs5-go/types"
"git.lumeweb.com/LumeWeb/libs5-go/utils"
"github.com/olebedev/emitter"
"github.com/vmihailenco/msgpack/v5"
"go.etcd.io/bbolt"
"go.uber.org/zap"
"time"
)
const registryBucketName = "registry"
var (
_ service.Service = (*RegistryServiceDefault)(nil)
_ service.RegistryService = (*RegistryServiceDefault)(nil)
)
type RegistryServiceDefault struct {
streams structs.Map
subs structs.Map
service.ServiceBase
}
2024-02-27 08:28:25 +00:00
func (r *RegistryServiceDefault) Start(ctx context.Context) error {
return nil
}
2024-02-27 08:28:25 +00:00
func (r *RegistryServiceDefault) Stop(ctx context.Context) error {
return nil
}
2024-02-27 08:28:25 +00:00
func (r *RegistryServiceDefault) Init(ctx context.Context) error {
return utils.CreateBucket(registryBucketName, r.Db())
}
func NewRegistry(params service.ServiceParams) *RegistryServiceDefault {
return &RegistryServiceDefault{
streams: structs.NewMap(),
subs: structs.NewMap(),
ServiceBase: service.NewServiceBase(params.Logger, params.Config, params.Db),
}
}
func (r *RegistryServiceDefault) Set(sre protocol.SignedRegistryEntry, trusted bool, receivedFrom net.Peer) error {
hash := encoding.NewMultihash(sre.PK())
hashString, err := hash.ToString()
if err != nil {
return err
}
pid, err := receivedFrom.Id().ToString()
if err != nil {
return err
}
r.Logger().Debug("[registry] set", zap.String("pk", hashString), zap.Uint64("revision", sre.Revision()), zap.String("receivedFrom", pid))
if !trusted {
if len(sre.PK()) != 33 {
return errors.New("Invalid pubkey")
}
if int(sre.PK()[0]) != int(types.HashTypeEd25519) {
return errors.New("Only ed25519 keys are supported")
}
if sre.Revision() < 0 || sre.Revision() > 281474976710656 {
return errors.New("Invalid revision")
}
if len(sre.Data()) > types.RegistryMaxDataSize {
return errors.New("Data too long")
}
if !sre.Verify() {
return errors.New("Invalid signature found")
}
}
existingEntry, err := r.getFromDB(sre.PK())
if err != nil {
return err
}
if existingEntry != nil {
if receivedFrom != nil {
if existingEntry.Revision() == sre.Revision() {
return nil
} else if existingEntry.Revision() > sre.Revision() {
updateMessage := protocol.MarshalSignedRegistryEntry(existingEntry)
err := receivedFrom.SendMessage(updateMessage)
if err != nil {
return err
}
return nil
}
}
if existingEntry.Revision() >= sre.Revision() {
return errors.New("Revision number too low")
}
}
key := encoding.NewMultihash(sre.PK())
keyString, err := key.ToString()
if err != nil {
return err
}
eventObj, ok := r.streams.Get(keyString)
if ok {
event := eventObj.(*emitter.Emitter)
go event.Emit("fire", sre)
}
err = r.Db().Update(func(txn *bbolt.Tx) error {
bucket := txn.Bucket([]byte(registryBucketName))
err := bucket.Put(sre.PK(), protocol.MarshalSignedRegistryEntry(sre))
if err != nil {
return err
}
return nil
})
if err != nil {
return err
}
err = r.BroadcastEntry(sre, receivedFrom)
if err != nil {
return err
}
return nil
}
func (r *RegistryServiceDefault) BroadcastEntry(sre protocol.SignedRegistryEntry, receivedFrom net.Peer) error {
hash := encoding.NewMultihash(sre.PK())
hashString, err := hash.ToString()
if err != nil {
return err
}
pid, err := receivedFrom.Id().ToString()
if err != nil {
return err
}
r.Logger().Debug("[registry] broadcastEntry", zap.String("pk", hashString), zap.Uint64("revision", sre.Revision()), zap.String("receivedFrom", pid))
updateMessage := protocol.MarshalSignedRegistryEntry(sre)
for _, p := range r.Services().P2P().Peers().Values() {
peer, ok := p.(net.Peer)
if !ok {
continue
}
if receivedFrom == nil || peer.Id().Equals(receivedFrom.Id()) {
err := peer.SendMessage(updateMessage)
if err != nil {
pid, err := peer.Id().ToString()
if err != nil {
return err
}
r.Logger().Error("Failed to send registry broadcast", zap.String("peer", pid), zap.Error(err))
return err
}
}
}
return nil
}
func (r *RegistryServiceDefault) SendRegistryRequest(pk []byte) error {
query := protocol.NewRegistryQuery(pk)
request, err := msgpack.Marshal(query)
if err != nil {
return err
}
// Iterate over peers and send the request
for _, peerVal := range r.Services().P2P().Peers().Values() {
peer, ok := peerVal.(net.Peer)
if !ok {
continue
}
err := peer.SendMessage(request)
if err != nil {
pid, err := peer.Id().ToString()
if err != nil {
return err
}
r.Logger().Error("Failed to send registry request", zap.String("peer", pid), zap.Error(err))
return err
}
}
return nil
}
func (r *RegistryServiceDefault) Get(pk []byte) (protocol.SignedRegistryEntry, error) {
key := encoding.NewMultihash(pk)
keyString, err := key.ToString()
if err != nil {
return nil, err
}
if r.subs.Contains(keyString) {
r.Logger().Debug("[registry] get (cached)", zap.String("key", keyString))
res, err := r.getFromDB(pk)
if err != nil {
return nil, err
}
if res != nil {
return res, nil
}
err = r.SendRegistryRequest(pk)
if err != nil {
return nil, err
}
time.Sleep(200 * time.Millisecond)
return r.getFromDB(pk)
}
err = r.SendRegistryRequest(pk)
if err != nil {
return nil, err
}
r.subs.Put(keyString, key)
if !r.streams.Contains(keyString) {
event := &emitter.Emitter{}
r.streams.Put(keyString, event)
}
res, err := r.getFromDB(pk)
if err != nil {
return nil, err
}
if res != nil {
return res, nil
}
r.Logger().Debug("[registry] get (cached)", zap.String("key", keyString))
for i := 0; i < 200; i++ {
time.Sleep(10 * time.Millisecond)
res, err = r.getFromDB(pk)
if err != nil {
return nil, err
}
if res != nil {
break
}
}
return res, nil
}
func (r *RegistryServiceDefault) Listen(pk []byte, cb func(sre protocol.SignedRegistryEntry)) (func(), error) {
key, err := encoding.NewMultihash(pk).ToString()
if err != nil {
return nil, err
}
cbProxy := func(event *emitter.Event) {
sre, ok := event.Args[0].(protocol.SignedRegistryEntry)
if !ok {
r.Logger().Error("Failed to cast event to SignedRegistryEntry")
return
}
cb(sre)
}
if !r.streams.Contains(key) {
em := emitter.New(0)
r.streams.Put(key, em)
err := r.SendRegistryRequest(pk)
if err != nil {
return nil, err
}
}
streamVal, _ := r.streams.Get(key)
stream := streamVal.(*emitter.Emitter)
channel := stream.On("fire", cbProxy)
return func() {
stream.Off("fire", channel)
}, nil
}
func (r *RegistryServiceDefault) getFromDB(pk []byte) (sre protocol.SignedRegistryEntry, err error) {
err = r.Db().View(func(txn *bbolt.Tx) error {
bucket := txn.Bucket([]byte(registryBucketName))
val := bucket.Get(pk)
if val != nil {
entry, err := protocol.UnmarshalSignedRegistryEntry(val)
if err != nil {
return err
}
sre = entry
return nil
}
return nil
})
if err != nil {
return nil, err
}
return sre, nil
}