fix: we need to run all bucket actions via transactions and cannot store a pointer to the bucket

This commit is contained in:
Derrick Hammer 2024-01-09 15:49:23 -05:00
parent 1f01f40338
commit c137d75b24
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
3 changed files with 56 additions and 30 deletions

View File

@ -2,6 +2,7 @@ package node
import ( import (
"errors" "errors"
"fmt"
"git.lumeweb.com/LumeWeb/libs5-go/config" "git.lumeweb.com/LumeWeb/libs5-go/config"
"git.lumeweb.com/LumeWeb/libs5-go/encoding" "git.lumeweb.com/LumeWeb/libs5-go/encoding"
"git.lumeweb.com/LumeWeb/libs5-go/interfaces" "git.lumeweb.com/LumeWeb/libs5-go/interfaces"
@ -31,7 +32,6 @@ type NodeImpl struct {
started bool started bool
hashQueryRoutingTable structs.Map hashQueryRoutingTable structs.Map
services interfaces.Services services interfaces.Services
cacheBucket *bolt.Bucket
httpClient *resty.Client httpClient *resty.Client
connections sync.WaitGroup connections sync.WaitGroup
} }
@ -86,9 +86,7 @@ func (n *NodeImpl) Start() error {
protocol.Init() protocol.Init()
signed.Init() signed.Init()
err := err :=
utils.CreateBucket(cacheBucketName, n.Db(), func(bucket *bolt.Bucket) { utils.CreateBucket(cacheBucketName, n.Db())
n.cacheBucket = bucket
})
if err != nil { if err != nil {
return err return err
@ -157,20 +155,31 @@ func (n *NodeImpl) GetCachedStorageLocations(hash *encoding.Multihash, kinds []t
return locations, nil return locations, nil
} }
func (n *NodeImpl) readStorageLocationsFromDB(hash *encoding.Multihash) (storage.StorageLocationMap, error) { func (n *NodeImpl) readStorageLocationsFromDB(hash *encoding.Multihash) (storage.StorageLocationMap, error) {
locationMap := storage.NewStorageLocationMap() var locationMap storage.StorageLocationMap
bytes := n.cacheBucket.Get(hash.FullBytes()) err := n.Db().View(func(tx *bolt.Tx) error {
if bytes == nil { b := tx.Bucket([]byte(cacheBucketName)) // Replace with your actual bucket name
return locationMap, nil if b == nil {
return fmt.Errorf("bucket %s not found", cacheBucketName)
} }
err := msgpack.Unmarshal(bytes, locationMap) bytes := b.Get(hash.FullBytes())
if bytes == nil {
// If no data found, return an empty locationMap but no error
locationMap = storage.NewStorageLocationMap()
return nil
}
return msgpack.Unmarshal(bytes, &locationMap)
})
if err != nil { if err != nil {
return nil, err return nil, err
} }
return locationMap, nil return locationMap, nil
} }
func (n *NodeImpl) AddStorageLocation(hash *encoding.Multihash, nodeId *encoding.NodeId, location interfaces.StorageLocation, message []byte, config *config.NodeConfig) error { func (n *NodeImpl) AddStorageLocation(hash *encoding.Multihash, nodeId *encoding.NodeId, location interfaces.StorageLocation, message []byte, config *config.NodeConfig) error {
// Read existing storage locations // Read existing storage locations
locationDb, err := n.readStorageLocationsFromDB(hash) locationDb, err := n.readStorageLocationsFromDB(hash)
@ -205,8 +214,11 @@ func (n *NodeImpl) AddStorageLocation(hash *encoding.Multihash, nodeId *encoding
if err != nil { if err != nil {
return err return err
} }
err = n.Db().Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(cacheBucketName))
err = n.cacheBucket.Put(hash.FullBytes(), packedBytes) return b.Put(hash.FullBytes(), packedBytes)
})
if err != nil { if err != nil {
return err return err
} }

View File

@ -321,13 +321,29 @@ func (p *P2PImpl) OnNewPeerListen(peer net.Peer, verifyId bool) {
} }
func (p *P2PImpl) readNodeVotes(nodeId *encoding.NodeId) (interfaces.NodeVotes, error) { func (p *P2PImpl) readNodeVotes(nodeId *encoding.NodeId) (interfaces.NodeVotes, error) {
node := p.nodesBucket.Get(nodeId.Raw()) var value []byte
if node == nil { var found bool
err := p.node.Db().View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(nodeBucketName))
if b == nil {
return fmt.Errorf("Bucket %s not found", nodeBucketName)
}
value = b.Get(nodeId.Raw())
if value == nil {
return nil
}
found = true
return nil
})
if err != nil {
return nil, err
}
if !found {
return NewNodeVotes(), nil return NewNodeVotes(), nil
} }
var score interfaces.NodeVotes var score interfaces.NodeVotes
err := msgpack.Unmarshal(node, &score) err = msgpack.Unmarshal(value, &score)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -335,23 +351,23 @@ func (p *P2PImpl) readNodeVotes(nodeId *encoding.NodeId) (interfaces.NodeVotes,
return score, nil return score, nil
} }
func (p *P2PImpl) saveNodeVotes(nodeId *encoding.NodeId) (interfaces.NodeVotes, error) { func (p *P2PImpl) saveNodeVotes(nodeId *encoding.NodeId, votes interfaces.NodeVotes) error {
votes, err := p.readNodeVotes(nodeId) // Marshal the votes into data
if err != nil {
return nil, err
}
data, err := msgpack.Marshal(votes) data, err := msgpack.Marshal(votes)
if err != nil { if err != nil {
return nil, err return err
} }
err = p.nodesBucket.Put(nodeId.Raw(), data) // Use a transaction to save the data
if err != nil { err = p.node.Db().Update(func(tx *bolt.Tx) error {
return nil, err // Get or create the bucket
} b := tx.Bucket([]byte(nodeBucketName))
return votes, nil // Put the data into the bucket
return b.Put(nodeId.Raw(), data)
})
return err
} }
func (p *P2PImpl) GetNodeScore(nodeId *encoding.NodeId) (float64, error) { func (p *P2PImpl) GetNodeScore(nodeId *encoding.NodeId) (float64, error) {

View File

@ -2,16 +2,14 @@ package utils
import bolt "go.etcd.io/bbolt" import bolt "go.etcd.io/bbolt"
func CreateBucket(name string, db *bolt.DB, cb func(bucket *bolt.Bucket)) error { func CreateBucket(name string, db *bolt.DB) error {
err := err :=
db.Update(func(tx *bolt.Tx) error { db.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists([]byte(name)) _, err := tx.CreateBucketIfNotExists([]byte(name))
if err != nil { if err != nil {
return err return err
} }
cb(bucket)
return nil return nil
}) })