diff --git a/interfaces/registry.go b/interfaces/registry.go index 3366ceb..feec03c 100644 --- a/interfaces/registry.go +++ b/interfaces/registry.go @@ -23,5 +23,6 @@ type RegistryService interface { Get(pk []byte) (SignedRegistryEntry, error) BroadcastEntry(sre SignedRegistryEntry, receivedFrom net.Peer) error SendRegistryRequest(pk []byte) error + Listen(pk []byte, cb func(sre SignedRegistryEntry)) (func(), error) Service } diff --git a/service/registry.go b/service/registry.go index f91f87e..cbfbaac 100644 --- a/service/registry.go +++ b/service/registry.go @@ -260,6 +260,39 @@ func (r *RegistryImpl) Get(pk []byte) (interfaces.SignedRegistryEntry, error) { return nil, nil } +func (r *RegistryImpl) Listen(pk []byte, cb func(sre interfaces.SignedRegistryEntry)) (func(), error) { + key, err := encoding.NewMultihash(pk).ToString() + if err != nil { + return nil, err + } + + cbProxy := func(event *emitter.Event) { + sre, ok := event.Args[0].(interfaces.SignedRegistryEntry) + if !ok { + r.logger.Error("Failed to cast event to SignedRegistryEntry") + return + } + + cb(sre) + } + + if !r.streams.Contains(key) { + em := emitter.New(0) + r.streams.Put(key, em) + err := r.SendRegistryRequest(pk) + if err != nil { + return nil, err + } + } + streamVal, _ := r.streams.Get(key) + stream := streamVal.(*emitter.Emitter) + channel := stream.On("event", cbProxy) + + return func() { + stream.Off("event", channel) + }, nil +} + func (r *RegistryImpl) getFromDB(pk []byte) (sre interfaces.SignedRegistryEntry, err error) { err = r.node.Db().View(func(txn *bbolt.Tx) error { bucket := txn.Bucket([]byte(registryBucketName))