feat: add listen method

This commit is contained in:
Derrick Hammer 2024-01-10 07:05:13 -05:00
parent 2cfbacbcd7
commit 180b76ee3c
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
2 changed files with 34 additions and 0 deletions

View File

@ -23,5 +23,6 @@ type RegistryService interface {
Get(pk []byte) (SignedRegistryEntry, error)
BroadcastEntry(sre SignedRegistryEntry, receivedFrom net.Peer) error
SendRegistryRequest(pk []byte) error
Listen(pk []byte, cb func(sre SignedRegistryEntry)) (func(), error)
Service
}

View File

@ -260,6 +260,39 @@ func (r *RegistryImpl) Get(pk []byte) (interfaces.SignedRegistryEntry, error) {
return nil, nil
}
func (r *RegistryImpl) Listen(pk []byte, cb func(sre interfaces.SignedRegistryEntry)) (func(), error) {
key, err := encoding.NewMultihash(pk).ToString()
if err != nil {
return nil, err
}
cbProxy := func(event *emitter.Event) {
sre, ok := event.Args[0].(interfaces.SignedRegistryEntry)
if !ok {
r.logger.Error("Failed to cast event to SignedRegistryEntry")
return
}
cb(sre)
}
if !r.streams.Contains(key) {
em := emitter.New(0)
r.streams.Put(key, em)
err := r.SendRegistryRequest(pk)
if err != nil {
return nil, err
}
}
streamVal, _ := r.streams.Get(key)
stream := streamVal.(*emitter.Emitter)
channel := stream.On("event", cbProxy)
return func() {
stream.Off("event", channel)
}, nil
}
func (r *RegistryImpl) getFromDB(pk []byte) (sre interfaces.SignedRegistryEntry, err error) {
err = r.node.Db().View(func(txn *bbolt.Tx) error {
bucket := txn.Bucket([]byte(registryBucketName))