feat: implement new kv database package starting with bbolt

This commit is contained in:
Derrick Hammer 2024-03-09 06:46:48 -05:00
parent cc2964e80f
commit c9fe8a0819
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
10 changed files with 210 additions and 102 deletions

View File

@ -1,15 +1,15 @@
package config
import (
"git.lumeweb.com/LumeWeb/libs5-go/db"
"git.lumeweb.com/LumeWeb/libs5-go/ed25519"
bolt "go.etcd.io/bbolt"
"go.uber.org/zap"
)
type NodeConfig struct {
P2P P2PConfig `mapstructure:"p2p"`
KeyPair *ed25519.KeyPairEd25519
DB *bolt.DB
DB db.KVStore
Logger *zap.Logger
HTTP HTTPConfig `mapstructure:"http"`
}

121
db/bboltdb.go Normal file
View File

@ -0,0 +1,121 @@
package db
import (
"errors"
"go.etcd.io/bbolt"
)
var _ KVStore = (*BboltDBKVStore)(nil)
type BboltDBKVStore struct {
db *bbolt.DB
bucket *bbolt.Bucket
bucketName string
root bool
dbPath string
}
func (b BboltDBKVStore) Open() error {
if b.root && b.db == nil {
db, err := bbolt.Open(b.dbPath, 0666, nil)
if err != nil {
return err
}
b.db = db
}
if b.bucket == nil && len(b.bucketName) > 0 {
err := b.db.Update(func(txn *bbolt.Tx) error {
bucket, err := txn.CreateBucketIfNotExists([]byte(b.bucketName))
if err != nil {
return err
}
b.bucket = bucket
return nil
})
if err != nil {
return err
}
}
return nil
}
func (b BboltDBKVStore) Close() error {
if b.root && b.db != nil {
err := b.db.Close()
if err != nil {
return err
}
}
return nil
}
func (b BboltDBKVStore) Get(key []byte) ([]byte, error) {
if b.root {
return nil, errors.New("Cannot get from root")
}
var val []byte
err := b.db.View(func(txn *bbolt.Tx) error {
bucket := txn.Bucket([]byte(b.bucketName))
val = bucket.Get(key)
return nil
})
if err != nil {
return nil, err
}
return val, nil
}
func (b BboltDBKVStore) Put(key []byte, value []byte) error {
if b.root {
return errors.New("Cannot put from root")
}
err := b.db.Update(func(txn *bbolt.Tx) error {
bucket := txn.Bucket([]byte(b.bucketName))
err := bucket.Put(key, value)
if err != nil {
return err
}
return nil
})
return err
}
func (b BboltDBKVStore) Delete(key []byte) error {
if b.root {
return errors.New("Cannot delete from root")
}
err := b.db.Update(func(txn *bbolt.Tx) error {
bucket := txn.Bucket([]byte(b.bucketName))
err := bucket.Delete(key)
if err != nil {
return err
}
return nil
})
return err
}
func (b BboltDBKVStore) Bucket(prefix string) (KVStore, error) {
return &BboltDBKVStore{
db: b.db,
bucketName: prefix,
root: false,
}, nil
}
func NewBboltDBKVStore(dbPath string) *BboltDBKVStore {
return &BboltDBKVStore{
dbPath: dbPath,
root: true,
}
}

10
db/db.go Normal file
View File

@ -0,0 +1,10 @@
package db
type KVStore interface {
Open() error
Close() error
Get(key []byte) ([]byte, error)
Put(key []byte, value []byte) error
Delete(key []byte) error
Bucket(prefix string) (KVStore, error)
}

View File

@ -3,11 +3,11 @@ package node
import (
"context"
"git.lumeweb.com/LumeWeb/libs5-go/config"
"git.lumeweb.com/LumeWeb/libs5-go/db"
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
"git.lumeweb.com/LumeWeb/libs5-go/protocol"
"git.lumeweb.com/LumeWeb/libs5-go/service"
_default "git.lumeweb.com/LumeWeb/libs5-go/service/default"
bolt "go.etcd.io/bbolt"
"go.uber.org/zap"
)
@ -42,7 +42,7 @@ func (n *Node) Logger() *zap.Logger {
return nil
}
func (n *Node) Db() *bolt.DB {
func (n *Node) Db() db.KVStore {
if n.nodeConfig != nil {
return n.nodeConfig.DB
}

View File

@ -76,6 +76,11 @@ func (s *ServicesImpl) IsStarting() bool {
}
func (s *ServicesImpl) Init(ctx context.Context) error {
err := s.p2p.Config().DB.Open()
if err != nil {
return err
}
for _, svc := range s.All() {
err := svc.Init(ctx)
if err != nil {
@ -87,7 +92,6 @@ func (s *ServicesImpl) Init(ctx context.Context) error {
}
func (s *ServicesImpl) Start(ctx context.Context) error {
s.starting = true
for _, svc := range s.All() {

View File

@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"git.lumeweb.com/LumeWeb/libs5-go/db"
"git.lumeweb.com/LumeWeb/libs5-go/ed25519"
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
"git.lumeweb.com/LumeWeb/libs5-go/net"
@ -12,7 +13,6 @@ import (
"git.lumeweb.com/LumeWeb/libs5-go/service"
"git.lumeweb.com/LumeWeb/libs5-go/structs"
"git.lumeweb.com/LumeWeb/libs5-go/types"
"git.lumeweb.com/LumeWeb/libs5-go/utils"
"github.com/vmihailenco/msgpack/v5"
bolt "go.etcd.io/bbolt"
"go.uber.org/zap"
@ -48,6 +48,7 @@ type P2PServiceDefault struct {
maxOutgoingPeerFailures uint
connections sync.WaitGroup
hashQueryRoutingTable structs.Map
bucket db.KVStore
service.ServiceBase
}
@ -117,14 +118,20 @@ func (p *P2PServiceDefault) Init(ctx context.Context) error {
if p.inited {
return nil
}
p.localNodeID = encoding.NewNodeId(p.nodeKeyPair.PublicKey())
err := utils.CreateBucket(nodeBucketName, p.Db())
bucket, err := p.Db().Bucket(nodeBucketName)
if err != nil {
return err
}
err = bucket.Open()
if err != nil {
return err
}
p.bucket = bucket
p.inited = true
return nil
@ -490,23 +497,13 @@ func (p *P2PServiceDefault) OnNewPeerListen(peer net.Peer, verifyId bool) {
func (p *P2PServiceDefault) readNodeVotes(nodeId *encoding.NodeId) (service.NodeVotes, error) {
var value []byte
var found bool
err := p.Db().View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(nodeBucketName))
if b == nil {
return fmt.Errorf("Bucket %s not found", nodeBucketName)
}
value = b.Get(nodeId.Raw())
if value == nil {
return nil
}
found = true
return nil
})
value, err := p.bucket.Get(nodeId.Raw())
if err != nil {
return nil, err
}
if !found {
if value == nil {
return service.NewNodeVotes(), nil
}
@ -526,16 +523,12 @@ func (p *P2PServiceDefault) saveNodeVotes(nodeId *encoding.NodeId, votes service
return err
}
// Use a transaction to save the data
err = p.Db().Update(func(tx *bolt.Tx) error {
// Get or create the bucket
b := tx.Bucket([]byte(nodeBucketName))
err = p.bucket.Put(nodeId.Raw(), data)
if err != nil {
return err
}
// Put the data into the bucket
return b.Put(nodeId.Raw(), data)
})
return err
return nil
}
func (p *P2PServiceDefault) GetNodeScore(nodeId *encoding.NodeId) (float64, error) {

View File

@ -3,16 +3,15 @@ package _default
import (
"context"
"errors"
"git.lumeweb.com/LumeWeb/libs5-go/db"
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
"git.lumeweb.com/LumeWeb/libs5-go/net"
"git.lumeweb.com/LumeWeb/libs5-go/protocol"
"git.lumeweb.com/LumeWeb/libs5-go/service"
"git.lumeweb.com/LumeWeb/libs5-go/structs"
"git.lumeweb.com/LumeWeb/libs5-go/types"
"git.lumeweb.com/LumeWeb/libs5-go/utils"
"github.com/olebedev/emitter"
"github.com/vmihailenco/msgpack/v5"
"go.etcd.io/bbolt"
"go.uber.org/zap"
"time"
)
@ -27,6 +26,7 @@ var (
type RegistryServiceDefault struct {
streams structs.Map
subs structs.Map
bucket db.KVStore
service.ServiceBase
}
@ -39,7 +39,19 @@ func (r *RegistryServiceDefault) Stop(ctx context.Context) error {
}
func (r *RegistryServiceDefault) Init(ctx context.Context) error {
return utils.CreateBucket(registryBucketName, r.Db())
bucket, err := r.Db().Bucket(registryBucketName)
if err != nil {
return err
}
err = bucket.Open()
if err != nil {
return err
}
r.bucket = bucket
return nil
}
func NewRegistry(params service.ServiceParams) *RegistryServiceDefault {
@ -116,15 +128,7 @@ func (r *RegistryServiceDefault) Set(sre protocol.SignedRegistryEntry, trusted b
go event.Emit("fire", sre)
}
err = r.Db().Update(func(txn *bbolt.Tx) error {
bucket := txn.Bucket([]byte(registryBucketName))
err := bucket.Put(sre.PK(), protocol.MarshalSignedRegistryEntry(sre))
if err != nil {
return err
}
return nil
})
err = r.bucket.Put(sre.PK(), protocol.MarshalSignedRegistryEntry(sre))
if err != nil {
return err
}
@ -289,22 +293,17 @@ func (r *RegistryServiceDefault) Listen(pk []byte, cb func(sre protocol.SignedRe
}
func (r *RegistryServiceDefault) getFromDB(pk []byte) (sre protocol.SignedRegistryEntry, err error) {
err = r.Db().View(func(txn *bbolt.Tx) error {
bucket := txn.Bucket([]byte(registryBucketName))
val := bucket.Get(pk)
if val != nil {
entry, err := protocol.UnmarshalSignedRegistryEntry(val)
if err != nil {
return err
}
sre = entry
return nil
}
return nil
})
value, err := r.bucket.Get(pk)
if err != nil {
return nil, err
}
return sre, nil
if value != nil {
sre, err = protocol.UnmarshalSignedRegistryEntry(value)
if err != nil {
return nil, err
}
}
return nil, nil
}

View File

@ -3,7 +3,7 @@ package _default
import (
"context"
"errors"
"fmt"
"git.lumeweb.com/LumeWeb/libs5-go/db"
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
"git.lumeweb.com/LumeWeb/libs5-go/metadata"
"git.lumeweb.com/LumeWeb/libs5-go/service"
@ -11,11 +11,9 @@ import (
"git.lumeweb.com/LumeWeb/libs5-go/storage/provider"
"git.lumeweb.com/LumeWeb/libs5-go/structs"
"git.lumeweb.com/LumeWeb/libs5-go/types"
"git.lumeweb.com/LumeWeb/libs5-go/utils"
"github.com/ddo/rq"
_ "github.com/ddo/rq"
"github.com/vmihailenco/msgpack/v5"
"go.etcd.io/bbolt"
"go.uber.org/zap"
"io"
"net/http"
@ -36,6 +34,7 @@ var (
type StorageService struct {
metadataCache structs.Map
providerStore storage.ProviderStore
bucket db.KVStore
service.ServiceBase
}
@ -47,13 +46,19 @@ func NewStorage(params service.ServiceParams) *StorageService {
}
func (s *StorageService) Start(ctx context.Context) error {
err :=
utils.CreateBucket(cacheBucketName, s.Db())
bucket, err := s.Db().Bucket(cacheBucketName)
if err != nil {
return err
}
err = bucket.Open()
if err != nil {
return err
}
s.bucket = bucket
return nil
}
@ -160,22 +165,18 @@ func (s *StorageService) getLocalStorageLocation(hash *encoding.Multihash, kinds
func (s *StorageService) readStorageLocationsFromDB(hash *encoding.Multihash) (storage.StorageLocationMap, error) {
var locationMap storage.StorageLocationMap
err := s.Db().View(func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte(cacheBucketName)) // Replace with your actual bucket name
if b == nil {
return fmt.Errorf("bucket %s not found", cacheBucketName)
}
value, err := s.bucket.Get(hash.FullBytes())
if err != nil {
return nil, err
}
bytes := b.Get(hash.FullBytes())
if bytes == nil {
// If no data found, return an empty locationMap but no error
locationMap = storage.NewStorageLocationMap()
return nil
}
if value == nil {
return storage.NewStorageLocationMap(), nil
}
return msgpack.Unmarshal(bytes, &locationMap)
})
locationMap = storage.NewStorageLocationMap()
err = msgpack.Unmarshal(value, &locationMap)
if err != nil {
return nil, err
}
@ -217,11 +218,8 @@ func (s *StorageService) AddStorageLocation(hash *encoding.Multihash, nodeId *en
if err != nil {
return err
}
err = s.Db().Update(func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte(cacheBucketName))
return b.Put(hash.FullBytes(), packedBytes)
})
err = s.bucket.Put(hash.FullBytes(), packedBytes)
if err != nil {
return err
}

View File

@ -3,7 +3,7 @@ package service
import (
"context"
"git.lumeweb.com/LumeWeb/libs5-go/config"
"go.etcd.io/bbolt"
"git.lumeweb.com/LumeWeb/libs5-go/db"
"go.uber.org/zap"
)
@ -17,7 +17,7 @@ type Service interface {
Init(ctx context.Context) error
Logger() *zap.Logger
Config() *config.NodeConfig
Db() *bbolt.DB
Db() db.KVStore
ServicesSetter
}
type Services interface {
@ -36,17 +36,17 @@ type Services interface {
type ServiceParams struct {
Logger *zap.Logger
Config *config.NodeConfig
Db *bbolt.DB
Db db.KVStore
}
type ServiceBase struct {
logger *zap.Logger
config *config.NodeConfig
db *bbolt.DB
db db.KVStore
services Services
}
func NewServiceBase(logger *zap.Logger, config *config.NodeConfig, db *bbolt.DB) ServiceBase {
func NewServiceBase(logger *zap.Logger, config *config.NodeConfig, db db.KVStore) ServiceBase {
return ServiceBase{logger: logger, config: config, db: db}
}
@ -62,6 +62,6 @@ func (s *ServiceBase) Logger() *zap.Logger {
func (s *ServiceBase) Config() *config.NodeConfig {
return s.config
}
func (s *ServiceBase) Db() *bbolt.DB {
func (s *ServiceBase) Db() db.KVStore {
return s.db
}

View File

@ -1,17 +0,0 @@
package utils
import bolt "go.etcd.io/bbolt"
func CreateBucket(name string, db *bolt.DB) error {
err :=
db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists([]byte(name))
if err != nil {
return err
}
return nil
})
return err
}