From c9fe8a08191cdca29f3d8f1de6d5e5bfed20a178 Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Sat, 9 Mar 2024 06:46:48 -0500 Subject: [PATCH] feat: implement new kv database package starting with bbolt --- config/config.go | 4 +- db/bboltdb.go | 121 ++++++++++++++++++++++++++++++++++++ db/db.go | 10 +++ node/node.go | 4 +- node/services.go | 6 +- service/default/p2p.go | 45 ++++++-------- service/default/registry.go | 51 ++++++++------- service/default/storage.go | 42 ++++++------- service/service.go | 12 ++-- utils/bucket.go | 17 ----- 10 files changed, 210 insertions(+), 102 deletions(-) create mode 100644 db/bboltdb.go create mode 100644 db/db.go delete mode 100644 utils/bucket.go diff --git a/config/config.go b/config/config.go index 5483727..16a500c 100644 --- a/config/config.go +++ b/config/config.go @@ -1,15 +1,15 @@ package config import ( + "git.lumeweb.com/LumeWeb/libs5-go/db" "git.lumeweb.com/LumeWeb/libs5-go/ed25519" - bolt "go.etcd.io/bbolt" "go.uber.org/zap" ) type NodeConfig struct { P2P P2PConfig `mapstructure:"p2p"` KeyPair *ed25519.KeyPairEd25519 - DB *bolt.DB + DB db.KVStore Logger *zap.Logger HTTP HTTPConfig `mapstructure:"http"` } diff --git a/db/bboltdb.go b/db/bboltdb.go new file mode 100644 index 0000000..c6240d6 --- /dev/null +++ b/db/bboltdb.go @@ -0,0 +1,121 @@ +package db + +import ( + "errors" + "go.etcd.io/bbolt" +) + +var _ KVStore = (*BboltDBKVStore)(nil) + +type BboltDBKVStore struct { + db *bbolt.DB + bucket *bbolt.Bucket + bucketName string + root bool + dbPath string +} + +func (b BboltDBKVStore) Open() error { + if b.root && b.db == nil { + db, err := bbolt.Open(b.dbPath, 0666, nil) + if err != nil { + return err + } + b.db = db + } + + if b.bucket == nil && len(b.bucketName) > 0 { + err := b.db.Update(func(txn *bbolt.Tx) error { + bucket, err := txn.CreateBucketIfNotExists([]byte(b.bucketName)) + if err != nil { + return err + } + b.bucket = bucket + return nil + }) + if err != nil { + return err + } + } + + return nil +} + +func (b BboltDBKVStore) Close() error { + if b.root && b.db != nil { + err := b.db.Close() + if err != nil { + return err + } + } + + return nil +} + +func (b BboltDBKVStore) Get(key []byte) ([]byte, error) { + if b.root { + return nil, errors.New("Cannot get from root") + } + + var val []byte + err := b.db.View(func(txn *bbolt.Tx) error { + bucket := txn.Bucket([]byte(b.bucketName)) + val = bucket.Get(key) + return nil + }) + if err != nil { + return nil, err + } + + return val, nil +} + +func (b BboltDBKVStore) Put(key []byte, value []byte) error { + + if b.root { + return errors.New("Cannot put from root") + } + + err := b.db.Update(func(txn *bbolt.Tx) error { + bucket := txn.Bucket([]byte(b.bucketName)) + err := bucket.Put(key, value) + if err != nil { + return err + } + return nil + }) + + return err +} + +func (b BboltDBKVStore) Delete(key []byte) error { + if b.root { + return errors.New("Cannot delete from root") + } + + err := b.db.Update(func(txn *bbolt.Tx) error { + bucket := txn.Bucket([]byte(b.bucketName)) + err := bucket.Delete(key) + if err != nil { + return err + } + return nil + }) + + return err +} + +func (b BboltDBKVStore) Bucket(prefix string) (KVStore, error) { + return &BboltDBKVStore{ + db: b.db, + bucketName: prefix, + root: false, + }, nil +} + +func NewBboltDBKVStore(dbPath string) *BboltDBKVStore { + return &BboltDBKVStore{ + dbPath: dbPath, + root: true, + } +} diff --git a/db/db.go b/db/db.go new file mode 100644 index 0000000..a1e768d --- /dev/null +++ b/db/db.go @@ -0,0 +1,10 @@ +package db + +type KVStore interface { + Open() error + Close() error + Get(key []byte) ([]byte, error) + Put(key []byte, value []byte) error + Delete(key []byte) error + Bucket(prefix string) (KVStore, error) +} diff --git a/node/node.go b/node/node.go index c284450..b9f2da0 100644 --- a/node/node.go +++ b/node/node.go @@ -3,11 +3,11 @@ package node import ( "context" "git.lumeweb.com/LumeWeb/libs5-go/config" + "git.lumeweb.com/LumeWeb/libs5-go/db" "git.lumeweb.com/LumeWeb/libs5-go/encoding" "git.lumeweb.com/LumeWeb/libs5-go/protocol" "git.lumeweb.com/LumeWeb/libs5-go/service" _default "git.lumeweb.com/LumeWeb/libs5-go/service/default" - bolt "go.etcd.io/bbolt" "go.uber.org/zap" ) @@ -42,7 +42,7 @@ func (n *Node) Logger() *zap.Logger { return nil } -func (n *Node) Db() *bolt.DB { +func (n *Node) Db() db.KVStore { if n.nodeConfig != nil { return n.nodeConfig.DB } diff --git a/node/services.go b/node/services.go index e11bae2..7bc7c8d 100644 --- a/node/services.go +++ b/node/services.go @@ -76,6 +76,11 @@ func (s *ServicesImpl) IsStarting() bool { } func (s *ServicesImpl) Init(ctx context.Context) error { + err := s.p2p.Config().DB.Open() + if err != nil { + return err + } + for _, svc := range s.All() { err := svc.Init(ctx) if err != nil { @@ -87,7 +92,6 @@ func (s *ServicesImpl) Init(ctx context.Context) error { } func (s *ServicesImpl) Start(ctx context.Context) error { - s.starting = true for _, svc := range s.All() { diff --git a/service/default/p2p.go b/service/default/p2p.go index 1b22658..23bb993 100644 --- a/service/default/p2p.go +++ b/service/default/p2p.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "git.lumeweb.com/LumeWeb/libs5-go/db" "git.lumeweb.com/LumeWeb/libs5-go/ed25519" "git.lumeweb.com/LumeWeb/libs5-go/encoding" "git.lumeweb.com/LumeWeb/libs5-go/net" @@ -12,7 +13,6 @@ import ( "git.lumeweb.com/LumeWeb/libs5-go/service" "git.lumeweb.com/LumeWeb/libs5-go/structs" "git.lumeweb.com/LumeWeb/libs5-go/types" - "git.lumeweb.com/LumeWeb/libs5-go/utils" "github.com/vmihailenco/msgpack/v5" bolt "go.etcd.io/bbolt" "go.uber.org/zap" @@ -48,6 +48,7 @@ type P2PServiceDefault struct { maxOutgoingPeerFailures uint connections sync.WaitGroup hashQueryRoutingTable structs.Map + bucket db.KVStore service.ServiceBase } @@ -117,14 +118,20 @@ func (p *P2PServiceDefault) Init(ctx context.Context) error { if p.inited { return nil } + p.localNodeID = encoding.NewNodeId(p.nodeKeyPair.PublicKey()) - err := utils.CreateBucket(nodeBucketName, p.Db()) - + bucket, err := p.Db().Bucket(nodeBucketName) if err != nil { return err } + err = bucket.Open() + if err != nil { + return err + } + + p.bucket = bucket p.inited = true return nil @@ -490,23 +497,13 @@ func (p *P2PServiceDefault) OnNewPeerListen(peer net.Peer, verifyId bool) { func (p *P2PServiceDefault) readNodeVotes(nodeId *encoding.NodeId) (service.NodeVotes, error) { var value []byte - var found bool - err := p.Db().View(func(tx *bolt.Tx) error { - b := tx.Bucket([]byte(nodeBucketName)) - if b == nil { - return fmt.Errorf("Bucket %s not found", nodeBucketName) - } - value = b.Get(nodeId.Raw()) - if value == nil { - return nil - } - found = true - return nil - }) + + value, err := p.bucket.Get(nodeId.Raw()) if err != nil { return nil, err } - if !found { + + if value == nil { return service.NewNodeVotes(), nil } @@ -526,16 +523,12 @@ func (p *P2PServiceDefault) saveNodeVotes(nodeId *encoding.NodeId, votes service return err } - // Use a transaction to save the data - err = p.Db().Update(func(tx *bolt.Tx) error { - // Get or create the bucket - b := tx.Bucket([]byte(nodeBucketName)) + err = p.bucket.Put(nodeId.Raw(), data) + if err != nil { + return err + } - // Put the data into the bucket - return b.Put(nodeId.Raw(), data) - }) - - return err + return nil } func (p *P2PServiceDefault) GetNodeScore(nodeId *encoding.NodeId) (float64, error) { diff --git a/service/default/registry.go b/service/default/registry.go index 31a155b..8b3f3db 100644 --- a/service/default/registry.go +++ b/service/default/registry.go @@ -3,16 +3,15 @@ package _default import ( "context" "errors" + "git.lumeweb.com/LumeWeb/libs5-go/db" "git.lumeweb.com/LumeWeb/libs5-go/encoding" "git.lumeweb.com/LumeWeb/libs5-go/net" "git.lumeweb.com/LumeWeb/libs5-go/protocol" "git.lumeweb.com/LumeWeb/libs5-go/service" "git.lumeweb.com/LumeWeb/libs5-go/structs" "git.lumeweb.com/LumeWeb/libs5-go/types" - "git.lumeweb.com/LumeWeb/libs5-go/utils" "github.com/olebedev/emitter" "github.com/vmihailenco/msgpack/v5" - "go.etcd.io/bbolt" "go.uber.org/zap" "time" ) @@ -27,6 +26,7 @@ var ( type RegistryServiceDefault struct { streams structs.Map subs structs.Map + bucket db.KVStore service.ServiceBase } @@ -39,7 +39,19 @@ func (r *RegistryServiceDefault) Stop(ctx context.Context) error { } func (r *RegistryServiceDefault) Init(ctx context.Context) error { - return utils.CreateBucket(registryBucketName, r.Db()) + bucket, err := r.Db().Bucket(registryBucketName) + if err != nil { + return err + } + + err = bucket.Open() + if err != nil { + return err + } + + r.bucket = bucket + + return nil } func NewRegistry(params service.ServiceParams) *RegistryServiceDefault { @@ -116,15 +128,7 @@ func (r *RegistryServiceDefault) Set(sre protocol.SignedRegistryEntry, trusted b go event.Emit("fire", sre) } - err = r.Db().Update(func(txn *bbolt.Tx) error { - bucket := txn.Bucket([]byte(registryBucketName)) - err := bucket.Put(sre.PK(), protocol.MarshalSignedRegistryEntry(sre)) - if err != nil { - return err - } - return nil - }) - + err = r.bucket.Put(sre.PK(), protocol.MarshalSignedRegistryEntry(sre)) if err != nil { return err } @@ -289,22 +293,17 @@ func (r *RegistryServiceDefault) Listen(pk []byte, cb func(sre protocol.SignedRe } func (r *RegistryServiceDefault) getFromDB(pk []byte) (sre protocol.SignedRegistryEntry, err error) { - err = r.Db().View(func(txn *bbolt.Tx) error { - bucket := txn.Bucket([]byte(registryBucketName)) - val := bucket.Get(pk) - if val != nil { - entry, err := protocol.UnmarshalSignedRegistryEntry(val) - if err != nil { - return err - } - sre = entry - return nil - } - return nil - }) + value, err := r.bucket.Get(pk) if err != nil { return nil, err } - return sre, nil + if value != nil { + sre, err = protocol.UnmarshalSignedRegistryEntry(value) + if err != nil { + return nil, err + } + } + + return nil, nil } diff --git a/service/default/storage.go b/service/default/storage.go index 1cc2d1d..3f0041c 100644 --- a/service/default/storage.go +++ b/service/default/storage.go @@ -3,7 +3,7 @@ package _default import ( "context" "errors" - "fmt" + "git.lumeweb.com/LumeWeb/libs5-go/db" "git.lumeweb.com/LumeWeb/libs5-go/encoding" "git.lumeweb.com/LumeWeb/libs5-go/metadata" "git.lumeweb.com/LumeWeb/libs5-go/service" @@ -11,11 +11,9 @@ import ( "git.lumeweb.com/LumeWeb/libs5-go/storage/provider" "git.lumeweb.com/LumeWeb/libs5-go/structs" "git.lumeweb.com/LumeWeb/libs5-go/types" - "git.lumeweb.com/LumeWeb/libs5-go/utils" "github.com/ddo/rq" _ "github.com/ddo/rq" "github.com/vmihailenco/msgpack/v5" - "go.etcd.io/bbolt" "go.uber.org/zap" "io" "net/http" @@ -36,6 +34,7 @@ var ( type StorageService struct { metadataCache structs.Map providerStore storage.ProviderStore + bucket db.KVStore service.ServiceBase } @@ -47,13 +46,19 @@ func NewStorage(params service.ServiceParams) *StorageService { } func (s *StorageService) Start(ctx context.Context) error { - err := - utils.CreateBucket(cacheBucketName, s.Db()) + bucket, err := s.Db().Bucket(cacheBucketName) if err != nil { return err } + err = bucket.Open() + if err != nil { + return err + } + + s.bucket = bucket + return nil } @@ -160,22 +165,18 @@ func (s *StorageService) getLocalStorageLocation(hash *encoding.Multihash, kinds func (s *StorageService) readStorageLocationsFromDB(hash *encoding.Multihash) (storage.StorageLocationMap, error) { var locationMap storage.StorageLocationMap - err := s.Db().View(func(tx *bbolt.Tx) error { - b := tx.Bucket([]byte(cacheBucketName)) // Replace with your actual bucket name - if b == nil { - return fmt.Errorf("bucket %s not found", cacheBucketName) - } + value, err := s.bucket.Get(hash.FullBytes()) + if err != nil { + return nil, err + } - bytes := b.Get(hash.FullBytes()) - if bytes == nil { - // If no data found, return an empty locationMap but no error - locationMap = storage.NewStorageLocationMap() - return nil - } + if value == nil { + return storage.NewStorageLocationMap(), nil + } - return msgpack.Unmarshal(bytes, &locationMap) - }) + locationMap = storage.NewStorageLocationMap() + err = msgpack.Unmarshal(value, &locationMap) if err != nil { return nil, err } @@ -217,11 +218,8 @@ func (s *StorageService) AddStorageLocation(hash *encoding.Multihash, nodeId *en if err != nil { return err } - err = s.Db().Update(func(tx *bbolt.Tx) error { - b := tx.Bucket([]byte(cacheBucketName)) - return b.Put(hash.FullBytes(), packedBytes) - }) + err = s.bucket.Put(hash.FullBytes(), packedBytes) if err != nil { return err } diff --git a/service/service.go b/service/service.go index ca57c3b..3875c8f 100644 --- a/service/service.go +++ b/service/service.go @@ -3,7 +3,7 @@ package service import ( "context" "git.lumeweb.com/LumeWeb/libs5-go/config" - "go.etcd.io/bbolt" + "git.lumeweb.com/LumeWeb/libs5-go/db" "go.uber.org/zap" ) @@ -17,7 +17,7 @@ type Service interface { Init(ctx context.Context) error Logger() *zap.Logger Config() *config.NodeConfig - Db() *bbolt.DB + Db() db.KVStore ServicesSetter } type Services interface { @@ -36,17 +36,17 @@ type Services interface { type ServiceParams struct { Logger *zap.Logger Config *config.NodeConfig - Db *bbolt.DB + Db db.KVStore } type ServiceBase struct { logger *zap.Logger config *config.NodeConfig - db *bbolt.DB + db db.KVStore services Services } -func NewServiceBase(logger *zap.Logger, config *config.NodeConfig, db *bbolt.DB) ServiceBase { +func NewServiceBase(logger *zap.Logger, config *config.NodeConfig, db db.KVStore) ServiceBase { return ServiceBase{logger: logger, config: config, db: db} } @@ -62,6 +62,6 @@ func (s *ServiceBase) Logger() *zap.Logger { func (s *ServiceBase) Config() *config.NodeConfig { return s.config } -func (s *ServiceBase) Db() *bbolt.DB { +func (s *ServiceBase) Db() db.KVStore { return s.db } diff --git a/utils/bucket.go b/utils/bucket.go deleted file mode 100644 index 45984f0..0000000 --- a/utils/bucket.go +++ /dev/null @@ -1,17 +0,0 @@ -package utils - -import bolt "go.etcd.io/bbolt" - -func CreateBucket(name string, db *bolt.DB) error { - err := - db.Update(func(tx *bolt.Tx) error { - _, err := tx.CreateBucketIfNotExists([]byte(name)) - if err != nil { - return err - } - - return nil - }) - - return err -}