From c137d75b24f7a9a860105077ade023dd768139ed Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Tue, 9 Jan 2024 15:49:23 -0500 Subject: [PATCH] fix: we need to run all bucket actions via transactions and cannot store a pointer to the bucket --- node/node.go | 34 +++++++++++++++++++++++----------- service/p2p.go | 46 +++++++++++++++++++++++++++++++--------------- utils/bucket.go | 6 ++---- 3 files changed, 56 insertions(+), 30 deletions(-) diff --git a/node/node.go b/node/node.go index b451548..13aece4 100644 --- a/node/node.go +++ b/node/node.go @@ -2,6 +2,7 @@ package node import ( "errors" + "fmt" "git.lumeweb.com/LumeWeb/libs5-go/config" "git.lumeweb.com/LumeWeb/libs5-go/encoding" "git.lumeweb.com/LumeWeb/libs5-go/interfaces" @@ -31,7 +32,6 @@ type NodeImpl struct { started bool hashQueryRoutingTable structs.Map services interfaces.Services - cacheBucket *bolt.Bucket httpClient *resty.Client connections sync.WaitGroup } @@ -86,9 +86,7 @@ func (n *NodeImpl) Start() error { protocol.Init() signed.Init() err := - utils.CreateBucket(cacheBucketName, n.Db(), func(bucket *bolt.Bucket) { - n.cacheBucket = bucket - }) + utils.CreateBucket(cacheBucketName, n.Db()) if err != nil { return err @@ -157,20 +155,31 @@ func (n *NodeImpl) GetCachedStorageLocations(hash *encoding.Multihash, kinds []t return locations, nil } func (n *NodeImpl) readStorageLocationsFromDB(hash *encoding.Multihash) (storage.StorageLocationMap, error) { - locationMap := storage.NewStorageLocationMap() + var locationMap storage.StorageLocationMap - bytes := n.cacheBucket.Get(hash.FullBytes()) - if bytes == nil { - return locationMap, nil - } + err := n.Db().View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(cacheBucketName)) // Replace with your actual bucket name + if b == nil { + return fmt.Errorf("bucket %s not found", cacheBucketName) + } + + bytes := b.Get(hash.FullBytes()) + if bytes == nil { + // If no data found, return an empty locationMap but no error + locationMap = storage.NewStorageLocationMap() + return nil + } + + return msgpack.Unmarshal(bytes, &locationMap) + }) - err := msgpack.Unmarshal(bytes, locationMap) if err != nil { return nil, err } return locationMap, nil } + func (n *NodeImpl) AddStorageLocation(hash *encoding.Multihash, nodeId *encoding.NodeId, location interfaces.StorageLocation, message []byte, config *config.NodeConfig) error { // Read existing storage locations locationDb, err := n.readStorageLocationsFromDB(hash) @@ -205,8 +214,11 @@ func (n *NodeImpl) AddStorageLocation(hash *encoding.Multihash, nodeId *encoding if err != nil { return err } + err = n.Db().Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(cacheBucketName)) - err = n.cacheBucket.Put(hash.FullBytes(), packedBytes) + return b.Put(hash.FullBytes(), packedBytes) + }) if err != nil { return err } diff --git a/service/p2p.go b/service/p2p.go index e5685e6..e80b334 100644 --- a/service/p2p.go +++ b/service/p2p.go @@ -321,13 +321,29 @@ func (p *P2PImpl) OnNewPeerListen(peer net.Peer, verifyId bool) { } func (p *P2PImpl) readNodeVotes(nodeId *encoding.NodeId) (interfaces.NodeVotes, error) { - node := p.nodesBucket.Get(nodeId.Raw()) - if node == nil { + var value []byte + var found bool + err := p.node.Db().View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(nodeBucketName)) + if b == nil { + return fmt.Errorf("Bucket %s not found", nodeBucketName) + } + value = b.Get(nodeId.Raw()) + if value == nil { + return nil + } + found = true + return nil + }) + if err != nil { + return nil, err + } + if !found { return NewNodeVotes(), nil } var score interfaces.NodeVotes - err := msgpack.Unmarshal(node, &score) + err = msgpack.Unmarshal(value, &score) if err != nil { return nil, err } @@ -335,23 +351,23 @@ func (p *P2PImpl) readNodeVotes(nodeId *encoding.NodeId) (interfaces.NodeVotes, return score, nil } -func (p *P2PImpl) saveNodeVotes(nodeId *encoding.NodeId) (interfaces.NodeVotes, error) { - votes, err := p.readNodeVotes(nodeId) - if err != nil { - return nil, err - } - +func (p *P2PImpl) saveNodeVotes(nodeId *encoding.NodeId, votes interfaces.NodeVotes) error { + // Marshal the votes into data data, err := msgpack.Marshal(votes) if err != nil { - return nil, err + return err } - err = p.nodesBucket.Put(nodeId.Raw(), data) - if err != nil { - return nil, err - } + // Use a transaction to save the data + err = p.node.Db().Update(func(tx *bolt.Tx) error { + // Get or create the bucket + b := tx.Bucket([]byte(nodeBucketName)) - return votes, nil + // Put the data into the bucket + return b.Put(nodeId.Raw(), data) + }) + + return err } func (p *P2PImpl) GetNodeScore(nodeId *encoding.NodeId) (float64, error) { diff --git a/utils/bucket.go b/utils/bucket.go index 7646b4a..45984f0 100644 --- a/utils/bucket.go +++ b/utils/bucket.go @@ -2,16 +2,14 @@ package utils import bolt "go.etcd.io/bbolt" -func CreateBucket(name string, db *bolt.DB, cb func(bucket *bolt.Bucket)) error { +func CreateBucket(name string, db *bolt.DB) error { err := db.Update(func(tx *bolt.Tx) error { - bucket, err := tx.CreateBucketIfNotExists([]byte(name)) + _, err := tx.CreateBucketIfNotExists([]byte(name)) if err != nil { return err } - cb(bucket) - return nil })