refactor: further refactoring for DI, splitting node responsibilities to a new Storage service, Services, and P2P

This commit is contained in:
Derrick Hammer 2024-01-29 01:55:36 -05:00
parent 59a73e4266
commit b60979e79d
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
15 changed files with 396 additions and 325 deletions

View File

@ -1,38 +1,17 @@
package node package node
import ( import (
"errors"
"fmt"
"git.lumeweb.com/LumeWeb/libs5-go/config" "git.lumeweb.com/LumeWeb/libs5-go/config"
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
"git.lumeweb.com/LumeWeb/libs5-go/metadata"
"git.lumeweb.com/LumeWeb/libs5-go/protocol" "git.lumeweb.com/LumeWeb/libs5-go/protocol"
"git.lumeweb.com/LumeWeb/libs5-go/protocol/signed" "git.lumeweb.com/LumeWeb/libs5-go/protocol/signed"
"git.lumeweb.com/LumeWeb/libs5-go/service" "git.lumeweb.com/LumeWeb/libs5-go/service"
"git.lumeweb.com/LumeWeb/libs5-go/storage"
"git.lumeweb.com/LumeWeb/libs5-go/structs"
"git.lumeweb.com/LumeWeb/libs5-go/types"
"git.lumeweb.com/LumeWeb/libs5-go/utils"
"github.com/go-resty/resty/v2"
"github.com/vmihailenco/msgpack/v5"
bolt "go.etcd.io/bbolt" bolt "go.etcd.io/bbolt"
"go.uber.org/zap" "go.uber.org/zap"
"time"
) )
const cacheBucketName = "object-cache"
type Node struct { type Node struct {
nodeConfig *config.NodeConfig nodeConfig *config.NodeConfig
metadataCache structs.Map
hashQueryRoutingTable structs.Map
services service.Services services service.Services
httpClient *resty.Client
providerStore storage.ProviderStore
}
func (n *Node) NetworkId() string {
return n.nodeConfig.P2P.Network
} }
func (n *Node) Services() service.Services { func (n *Node) Services() service.Services {
@ -42,15 +21,9 @@ func (n *Node) Services() service.Services {
func NewNode(config *config.NodeConfig, services service.Services) *Node { func NewNode(config *config.NodeConfig, services service.Services) *Node {
return &Node{ return &Node{
nodeConfig: config, nodeConfig: config,
metadataCache: structs.NewMap(),
hashQueryRoutingTable: structs.NewMap(),
httpClient: resty.New(),
services: services, // Services are passed in, not created here services: services, // Services are passed in, not created here
} }
} }
func (n *Node) HashQueryRoutingTable() structs.Map {
return n.hashQueryRoutingTable
}
func (n *Node) IsStarted() bool { func (n *Node) IsStarted() bool {
return n.services.IsStarted() return n.services.IsStarted()
@ -77,12 +50,6 @@ func (n *Node) Db() *bolt.DB {
func (n *Node) Start() error { func (n *Node) Start() error {
protocol.RegisterProtocols() protocol.RegisterProtocols()
signed.RegisterSignedProtocols() signed.RegisterSignedProtocols()
err :=
utils.CreateBucket(cacheBucketName, n.Db())
if err != nil {
return err
}
return n.services.Start() return n.services.Start()
} }
@ -91,236 +58,10 @@ func (n *Node) Stop() error {
return n.services.Stop() return n.services.Stop()
} }
func (n *Node) GetCachedStorageLocations(hash *encoding.Multihash, kinds []types.StorageLocationType) (map[string]storage.StorageLocation, error) {
locations := make(map[string]storage.StorageLocation)
locationMap, err := n.readStorageLocationsFromDB(hash)
if err != nil {
return nil, err
}
if len(locationMap) == 0 {
return make(map[string]storage.StorageLocation), nil
}
ts := time.Now().Unix()
for _, t := range kinds {
nodeMap, ok := (locationMap)[int(t)]
if !ok {
continue
}
for key, value := range nodeMap {
expiry, ok := value[3].(int64)
if !ok || expiry < ts {
continue
}
addressesInterface, ok := value[1].([]interface{})
if !ok {
continue
}
// Create a slice to hold the strings
addresses := make([]string, len(addressesInterface))
// Convert each element to string
for i, v := range addressesInterface {
str, ok := v.(string)
if !ok {
// Handle the error, maybe skip this element or set a default value
continue
}
addresses[i] = str
}
storageLocation := storage.NewStorageLocation(int(t), addresses, expiry)
if providerMessage, ok := value[4].([]byte); ok {
(storageLocation).SetProviderMessage(providerMessage)
}
locations[key] = storageLocation
}
}
return locations, nil
}
func (n *Node) readStorageLocationsFromDB(hash *encoding.Multihash) (storage.StorageLocationMap, error) {
var locationMap storage.StorageLocationMap
err := n.Db().View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(cacheBucketName)) // Replace with your actual bucket name
if b == nil {
return fmt.Errorf("bucket %s not found", cacheBucketName)
}
bytes := b.Get(hash.FullBytes())
if bytes == nil {
// If no data found, return an empty locationMap but no error
locationMap = storage.NewStorageLocationMap()
return nil
}
return msgpack.Unmarshal(bytes, &locationMap)
})
if err != nil {
return nil, err
}
return locationMap, nil
}
func (n *Node) AddStorageLocation(hash *encoding.Multihash, nodeId *encoding.NodeId, location storage.StorageLocation, message []byte) error {
// Read existing storage locations
locationDb, err := n.readStorageLocationsFromDB(hash)
if err != nil {
return err
}
nodeIdStr, err := nodeId.ToString()
if err != nil {
return err
}
// Get or create the inner map for the specific type
innerMap, exists := locationDb[location.Type()]
if !exists {
innerMap = make(storage.NodeStorage, 1)
innerMap[nodeIdStr] = make(storage.NodeDetailsStorage, 1)
}
// Create location map with new data
locationMap := make(map[int]interface{}, 3)
locationMap[1] = location.Parts()
locationMap[3] = location.Expiry()
locationMap[4] = message
// Update the inner map with the new location
innerMap[nodeIdStr] = locationMap
locationDb[location.Type()] = innerMap
// Serialize the updated map and store it in the database
packedBytes, err := msgpack.Marshal(locationDb)
if err != nil {
return err
}
err = n.Db().Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(cacheBucketName))
return b.Put(hash.FullBytes(), packedBytes)
})
if err != nil {
return err
}
return nil
}
func (n *Node) DownloadBytesByHash(hash *encoding.Multihash) ([]byte, error) {
// Initialize the download URI provider
dlUriProvider := storage.NewStorageLocationProvider(n, hash, types.StorageLocationTypeFull, types.StorageLocationTypeFile)
err := dlUriProvider.Start()
if err != nil {
return nil, err
}
retryCount := 0
for {
dlUri, err := dlUriProvider.Next()
if err != nil {
return nil, err
}
n.Logger().Debug("Trying to download from", zap.String("url", dlUri.Location().BytesURL()))
res, err := n.httpClient.R().Get(dlUri.Location().BytesURL())
if err != nil {
err := dlUriProvider.Downvote(dlUri)
if err != nil {
return nil, err
}
retryCount++
if retryCount > 32 {
return nil, errors.New("too many retries")
}
continue
}
bodyBytes := res.Body()
return bodyBytes, nil
}
}
func (n *Node) DownloadBytesByCID(cid *encoding.CID) (bytes []byte, err error) {
bytes, err = n.DownloadBytesByHash(&cid.Hash)
if err != nil {
return nil, err
}
return bytes, nil
}
func (n *Node) GetMetadataByCID(cid *encoding.CID) (md metadata.Metadata, err error) {
hashStr, err := cid.Hash.ToString()
if err != nil {
return nil, err
}
if n.metadataCache.Contains(hashStr) {
md, _ := n.metadataCache.Get(hashStr)
return md.(metadata.Metadata), nil
}
bytes, err := n.DownloadBytesByHash(&cid.Hash)
if err != nil {
return nil, err
}
switch cid.Type {
case types.CIDTypeMetadataMedia, types.CIDTypeBridge: // Both cases use the same deserialization method
md = metadata.NewEmptyMediaMetadata()
err = msgpack.Unmarshal(bytes, md)
if err != nil {
return nil, err
}
case types.CIDTypeMetadataWebapp:
md = metadata.NewEmptyWebAppMetadata()
err = msgpack.Unmarshal(bytes, md)
if err != nil {
return nil, err
}
case types.CIDTypeDirectory:
md = metadata.NewEmptyDirectoryMetadata()
err = msgpack.Unmarshal(bytes, md)
if err != nil {
return nil, err
}
default:
return nil, errors.New("unsupported metadata format")
}
n.metadataCache.Put(hashStr, md)
return md, nil
}
func (n *Node) WaitOnConnectedPeers() { func (n *Node) WaitOnConnectedPeers() {
n.services.P2P().WaitOnConnectedPeers() n.services.P2P().WaitOnConnectedPeers()
} }
func (n *Node) SetProviderStore(store storage.ProviderStore) {
n.providerStore = store
}
func (n *Node) ProviderStore() storage.ProviderStore {
return n.providerStore
}
func DefaultNode(config *config.NodeConfig) *Node { func DefaultNode(config *config.NodeConfig) *Node {
params := service.ServiceParams{ params := service.ServiceParams{
Logger: config.Logger, Logger: config.Logger,

View File

@ -12,12 +12,14 @@ type ServicesParams struct {
P2P *service.P2PService P2P *service.P2PService
Registry *service.RegistryService Registry *service.RegistryService
HTTP *service.HTTPService HTTP *service.HTTPService
Storage *service.StorageService
} }
type ServicesImpl struct { type ServicesImpl struct {
p2p *service.P2PService p2p *service.P2PService
registry *service.RegistryService registry *service.RegistryService
http *service.HTTPService http *service.HTTPService
storage *service.StorageService
started bool started bool
} }
@ -25,11 +27,16 @@ func (s *ServicesImpl) HTTP() *service.HTTPService {
return s.http return s.http
} }
func (s *ServicesImpl) Storage() *service.StorageService {
return s.storage
}
func (s *ServicesImpl) All() []service.Service { func (s *ServicesImpl) All() []service.Service {
services := make([]service.Service, 0) services := make([]service.Service, 0)
services = append(services, s.p2p) services = append(services, s.p2p)
services = append(services, s.registry) services = append(services, s.registry)
services = append(services, s.http) services = append(services, s.http)
services = append(services, s.storage)
return services return services
} }
@ -43,6 +50,7 @@ func NewServices(params ServicesParams) service.Services {
p2p: params.P2P, p2p: params.P2P,
registry: params.Registry, registry: params.Registry,
http: params.HTTP, http: params.HTTP,
storage: params.Storage,
started: false, started: false,
} }

View File

@ -3,8 +3,9 @@ package base
import ( import (
"context" "context"
"git.lumeweb.com/LumeWeb/libs5-go/net" "git.lumeweb.com/LumeWeb/libs5-go/net"
"git.lumeweb.com/LumeWeb/libs5-go/node" "git.lumeweb.com/LumeWeb/libs5-go/service"
"github.com/vmihailenco/msgpack/v5" "github.com/vmihailenco/msgpack/v5"
"go.uber.org/zap"
"io" "io"
) )
@ -22,7 +23,8 @@ type IncomingMessageData struct {
Original []byte Original []byte
Data []byte Data []byte
Ctx context.Context Ctx context.Context
Node *node.Node Services service.Services
Logger *zap.Logger
Peer net.Peer Peer net.Peer
VerifyId bool VerifyId bool
} }

View File

@ -94,21 +94,21 @@ func (h *HandshakeOpen) DecodeMessage(dec *msgpack.Decoder, message base.Incomin
} }
func (h *HandshakeOpen) HandleMessage(message base.IncomingMessageData) error { func (h *HandshakeOpen) HandleMessage(message base.IncomingMessageData) error {
node := message.Node
peer := message.Peer peer := message.Peer
services := message.Services
if h.networkId != node.NetworkId() { if h.networkId != services.P2P().NetworkId() {
return fmt.Errorf("Peer is in different network: %s", h.networkId) return fmt.Errorf("Peer is in different network: %s", h.networkId)
} }
handshake := signed.NewHandshakeDoneRequest(h.handshake, types.SupportedFeatures, node.Services().P2P().SelfConnectionUris()) handshake := signed.NewHandshakeDoneRequest(h.handshake, types.SupportedFeatures, services.P2P().SelfConnectionUris())
hsMessage, err := msgpack.Marshal(handshake) hsMessage, err := msgpack.Marshal(handshake)
if err != nil { if err != nil {
return err return err
} }
secureMessage, err := node.Services().P2P().SignMessageSimple(hsMessage) secureMessage, err := services.P2P().SignMessageSimple(hsMessage)
if err != nil { if err != nil {
return err return err

View File

@ -88,10 +88,11 @@ func (h HashQuery) EncodeMsgpack(enc *msgpack.Encoder) error {
} }
func (h *HashQuery) HandleMessage(message base.IncomingMessageData) error { func (h *HashQuery) HandleMessage(message base.IncomingMessageData) error {
node := message.Node
peer := message.Peer peer := message.Peer
services := message.Services
logger := message.Logger
mapLocations, err := node.GetCachedStorageLocations(h.hash, h.kinds) mapLocations, err := services.Storage().GetCachedStorageLocations(h.hash, h.kinds)
if err != nil { if err != nil {
log.Printf("Error getting cached storage locations: %v", err) log.Printf("Error getting cached storage locations: %v", err)
return err return err
@ -102,14 +103,14 @@ func (h *HashQuery) HandleMessage(message base.IncomingMessageData) error {
for key := range mapLocations { for key := range mapLocations {
nodeId, err := encoding.DecodeNodeId(key) nodeId, err := encoding.DecodeNodeId(key)
if err != nil { if err != nil {
node.Logger().Error("Error decoding node id", zap.Error(err)) logger.Error("Error decoding node id", zap.Error(err))
continue continue
} }
availableNodes = append(availableNodes, nodeId) availableNodes = append(availableNodes, nodeId)
} }
score, err := node.Services().P2P().SortNodesByScore(availableNodes) score, err := services.P2P().SortNodesByScore(availableNodes)
if err != nil { if err != nil {
return err return err
} }
@ -128,16 +129,16 @@ func (h *HashQuery) HandleMessage(message base.IncomingMessageData) error {
} }
} }
if node.ProviderStore() != nil { if services.Storage().ProviderStore() != nil {
if node.ProviderStore().CanProvide(h.hash, h.kinds) { if services.Storage().ProviderStore().CanProvide(h.hash, h.kinds) {
location, err := node.ProviderStore().Provide(h.hash, h.kinds) location, err := services.Storage().ProviderStore().Provide(h.hash, h.kinds)
if err != nil { if err != nil {
return err return err
} }
message := node.Services().P2P().PrepareProvideMessage(h.hash, location) message := services.P2P().PrepareProvideMessage(h.hash, location)
err = node.AddStorageLocation(h.hash, node.Services().P2P().NodeId(), location, message) err = services.Storage().AddStorageLocation(h.hash, services.P2P().NodeId(), location, message)
if err != nil { if err != nil {
return err return err
} }
@ -151,11 +152,11 @@ func (h *HashQuery) HandleMessage(message base.IncomingMessageData) error {
var peers *hashset.Set var peers *hashset.Set
hashString, err := h.hash.ToString() hashString, err := h.hash.ToString()
node.Logger().Debug("HashQuery", zap.Any("hashString", hashString)) logger.Debug("HashQuery", zap.Any("hashString", hashString))
if err != nil { if err != nil {
return err return err
} }
peersVal, ok := node.HashQueryRoutingTable().Get(hashString) // Implement HashQueryRoutingTable method peersVal, ok := services.P2P().HashQueryRoutingTable().Get(hashString) // Implement HashQueryRoutingTable method
if ok { if ok {
peers = peersVal.(*hashset.Set) peers = peersVal.(*hashset.Set)
if !peers.Contains(peer.Id()) { if !peers.Contains(peer.Id()) {
@ -168,14 +169,14 @@ func (h *HashQuery) HandleMessage(message base.IncomingMessageData) error {
peerList := hashset.New() peerList := hashset.New()
peerList.Add(peer.Id()) peerList.Add(peer.Id())
node.HashQueryRoutingTable().Put(hashString, peerList) services.P2P().HashQueryRoutingTable().Put(hashString, peerList)
for _, val := range node.Services().P2P().Peers().Values() { for _, val := range services.P2P().Peers().Values() {
peerVal := val.(net.Peer) peerVal := val.(net.Peer)
if !peerVal.Id().Equals(peer.Id()) { if !peerVal.Id().Equals(peer.Id()) {
err := peerVal.SendMessage(message.Original) err := peerVal.SendMessage(message.Original)
if err != nil { if err != nil {
node.Logger().Error("Failed to send message", zap.Error(err)) logger.Error("Failed to send message", zap.Error(err))
} }
} }
} }

View File

@ -51,7 +51,7 @@ func (s *RegistryEntryRequest) DecodeMessage(dec *msgpack.Decoder, message base.
} }
func (s *RegistryEntryRequest) HandleMessage(message base.IncomingMessageData) error { func (s *RegistryEntryRequest) HandleMessage(message base.IncomingMessageData) error {
node := message.Node
peer := message.Peer peer := message.Peer
return node.Services().Registry().Set(s.sre, false, peer) services := message.Services
return services.Registry().Set(s.sre, false, peer)
} }

View File

@ -51,9 +51,9 @@ func (s *RegistryQuery) DecodeMessage(dec *msgpack.Decoder, message base.Incomin
} }
func (s *RegistryQuery) HandleMessage(message base.IncomingMessageData) error { func (s *RegistryQuery) HandleMessage(message base.IncomingMessageData) error {
node := message.Node services := message.Services
peer := message.Peer peer := message.Peer
sre, err := node.Services().Registry().Get(s.pk) sre, err := services.Registry().Get(s.pk)
if err != nil { if err != nil {
return err return err
} }

View File

@ -106,10 +106,10 @@ func (a *AnnouncePeers) DecodeMessage(dec *msgpack.Decoder, message IncomingMess
} }
func (a AnnouncePeers) HandleMessage(message IncomingMessageDataSigned) error { func (a AnnouncePeers) HandleMessage(message IncomingMessageDataSigned) error {
node := message.Node services := message.Services
peer := message.Peer peer := message.Peer
if len(a.connectionUris) > 0 { if len(a.connectionUris) > 0 {
err := node.Services().P2P().ConnectToNode([]*url.URL{a.connectionUris[0]}, false, peer) err := services.P2P().ConnectToNode([]*url.URL{a.connectionUris[0]}, false, peer)
if err != nil { if err != nil {
return err return err
} }

View File

@ -77,12 +77,13 @@ func NewHandshakeDone() *HandshakeDone {
} }
func (h HandshakeDone) HandleMessage(message IncomingMessageDataSigned) error { func (h HandshakeDone) HandleMessage(message IncomingMessageDataSigned) error {
node := message.Node services := message.Services
peer := message.Peer peer := message.Peer
verifyId := message.VerifyId verifyId := message.VerifyId
nodeId := message.NodeId nodeId := message.NodeId
logger := message.Logger
if !node.IsStarted() { if !services.IsStarted() {
err := peer.End() err := peer.End()
if err != nil { if err != nil {
return nil return nil
@ -108,7 +109,7 @@ func (h HandshakeDone) HandleMessage(message IncomingMessageDataSigned) error {
if h.supportedFeatures != types.SupportedFeatures { if h.supportedFeatures != types.SupportedFeatures {
return fmt.Errorf("Remote node does not support required features") return fmt.Errorf("Remote node does not support required features")
} }
err := node.Services().P2P().AddPeer(peer) err := services.P2P().AddPeer(peer)
if err != nil { if err != nil {
return err return err
} }
@ -121,9 +122,9 @@ func (h HandshakeDone) HandleMessage(message IncomingMessageDataSigned) error {
return err return err
} }
node.Logger().Info(fmt.Sprintf("[+] %s (%s)", peerId, peer.RenderLocationURI())) logger.Info(fmt.Sprintf("[+] %s (%s)", peerId, peer.RenderLocationURI()))
err = node.Services().P2P().SendPublicPeersToPeer(peer, []net.Peer{peer}) err = services.P2P().SendPublicPeersToPeer(peer, []net.Peer{peer})
if err != nil { if err != nil {
return err return err
} }

View File

@ -83,8 +83,8 @@ func NewSignedMessage() *SignedMessage {
func (s *SignedMessage) HandleMessage(message base.IncomingMessageData) error { func (s *SignedMessage) HandleMessage(message base.IncomingMessageData) error {
var payload signedMessageReader var payload signedMessageReader
node := message.Node
peer := message.Peer peer := message.Peer
logger := message.Logger
err := msgpack.Unmarshal(s.message, &payload) err := msgpack.Unmarshal(s.message, &payload)
if err != nil { if err != nil {
@ -92,9 +92,9 @@ func (s *SignedMessage) HandleMessage(message base.IncomingMessageData) error {
} }
if msgHandler, valid := GetMessageType(payload.kind); valid { if msgHandler, valid := GetMessageType(payload.kind); valid {
node.Logger().Debug("SignedMessage", zap.Any("type", types.ProtocolMethodMap[types.ProtocolMethod(payload.kind)])) logger.Debug("SignedMessage", zap.Any("type", types.ProtocolMethodMap[types.ProtocolMethod(payload.kind)]))
if msgHandler.RequiresHandshake() && !peer.IsHandshakeDone() { if msgHandler.RequiresHandshake() && !peer.IsHandshakeDone() {
node.Logger().Debug("Peer is not handshake done, ignoring message", zap.Any("type", types.ProtocolMethodMap[types.ProtocolMethod(payload.kind)])) logger.Debug("Peer is not handshake done, ignoring message", zap.Any("type", types.ProtocolMethodMap[types.ProtocolMethod(payload.kind)]))
return nil return nil
} }
err := msgpack.Unmarshal(payload.message, &msgHandler) err := msgpack.Unmarshal(payload.message, &msgHandler)

View File

@ -40,8 +40,9 @@ func (s *StorageLocation) DecodeMessage(dec *msgpack.Decoder, message base.Incom
} }
func (s *StorageLocation) HandleMessage(message base.IncomingMessageData) error { func (s *StorageLocation) HandleMessage(message base.IncomingMessageData) error {
msg := message.Original msg := message.Original
node := message.Node services := message.Services
peer := message.Peer peer := message.Peer
logger := message.Logger
hash := encoding.NewMultihash(msg[1:34]) // Replace NewMultihash with appropriate function hash := encoding.NewMultihash(msg[1:34]) // Replace NewMultihash with appropriate function
@ -80,7 +81,7 @@ func (s *StorageLocation) HandleMessage(message base.IncomingMessageData) error
nodeId := encoding.NewNodeId(publicKey) nodeId := encoding.NewNodeId(publicKey)
// Assuming `node` is an instance of your NodeImpl structure // Assuming `node` is an instance of your NodeImpl structure
err := node.AddStorageLocation(hash, nodeId, storage.NewStorageLocation(int(typeOfData), parts, int64(expiry)), msg) // Implement AddStorageLocation err := services.Storage().AddStorageLocation(hash, nodeId, storage.NewStorageLocation(int(typeOfData), parts, int64(expiry)), msg) // Implement AddStorageLocation
if err != nil { if err != nil {
return fmt.Errorf("Failed to add storage location: %s", err) return fmt.Errorf("Failed to add storage location: %s", err)
@ -92,7 +93,7 @@ func (s *StorageLocation) HandleMessage(message base.IncomingMessageData) error
} }
var list *hashset.Set var list *hashset.Set
listVal, ok := node.HashQueryRoutingTable().Get(hashStr) // Implement HashQueryRoutingTable method listVal, ok := services.P2P().HashQueryRoutingTable().Get(hashStr) // Implement HashQueryRoutingTable method
if !ok { if !ok {
list = hashset.New() list = hashset.New()
} else { } else {
@ -109,16 +110,16 @@ func (s *StorageLocation) HandleMessage(message base.IncomingMessageData) error
if err != nil { if err != nil {
return err return err
} }
if peerVal, ok := node.Services().P2P().Peers().Get(peerIdStr); ok { if peerVal, ok := services.P2P().Peers().Get(peerIdStr); ok {
foundPeer := peerVal.(net.Peer) foundPeer := peerVal.(net.Peer)
err := foundPeer.SendMessage(msg) err := foundPeer.SendMessage(msg)
if err != nil { if err != nil {
node.Logger().Error("Failed to send message", zap.Error(err)) logger.Error("Failed to send message", zap.Error(err))
continue continue
} }
} }
node.HashQueryRoutingTable().Remove(hash.HashCode()) services.P2P().HashQueryRoutingTable().Remove(hash.HashCode())
} }
return nil return nil

View File

@ -50,6 +50,7 @@ type P2PService struct {
outgoingPeerFailures structs.Map outgoingPeerFailures structs.Map
maxOutgoingPeerFailures uint maxOutgoingPeerFailures uint
connections sync.WaitGroup connections sync.WaitGroup
hashQueryRoutingTable structs.Map
ServiceBase ServiceBase
} }
@ -72,6 +73,7 @@ func NewP2P(params ServiceParams) *P2PService {
incomingIPBlocklist: structs.NewMap(), incomingIPBlocklist: structs.NewMap(),
outgoingPeerFailures: structs.NewMap(), outgoingPeerFailures: structs.NewMap(),
maxOutgoingPeerFailures: params.Config.P2P.MaxOutgoingPeerFailures, maxOutgoingPeerFailures: params.Config.P2P.MaxOutgoingPeerFailures,
hashQueryRoutingTable: structs.NewMap(),
ServiceBase: ServiceBase{ ServiceBase: ServiceBase{
logger: params.Logger, logger: params.Logger,
config: params.Config, config: params.Config,
@ -711,3 +713,10 @@ func (p *P2PService) WaitOnConnectedPeers() {
func (p *P2PService) ConnectionTracker() *sync.WaitGroup { func (p *P2PService) ConnectionTracker() *sync.WaitGroup {
return &p.connections return &p.connections
} }
func (p *P2PService) NetworkId() string {
return p.config.P2P.Network
}
func (n *P2PService) HashQueryRoutingTable() structs.Map {
return n.hashQueryRoutingTable
}

View File

@ -16,6 +16,7 @@ type Services interface {
P2P() *P2PService P2P() *P2PService
Registry() *RegistryService Registry() *RegistryService
HTTP() *HTTPService HTTP() *HTTPService
Storage() *StorageService
All() []Service All() []Service
IsStarted() bool IsStarted() bool
Start() error Start() error

300
service/storage.go Normal file
View File

@ -0,0 +1,300 @@
package service
import (
"errors"
"fmt"
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
"git.lumeweb.com/LumeWeb/libs5-go/metadata"
"git.lumeweb.com/LumeWeb/libs5-go/storage"
"git.lumeweb.com/LumeWeb/libs5-go/structs"
"git.lumeweb.com/LumeWeb/libs5-go/types"
"git.lumeweb.com/LumeWeb/libs5-go/utils"
"github.com/go-resty/resty/v2"
"github.com/vmihailenco/msgpack/v5"
"go.etcd.io/bbolt"
"go.uber.org/zap"
"time"
)
const cacheBucketName = "object-cache"
var (
_ Service = (*StorageService)(nil)
)
type StorageService struct {
httpClient *resty.Client
metadataCache structs.Map
providerStore storage.ProviderStore
ServiceBase
}
func NewStorage(params ServiceParams) *StorageService {
return &StorageService{
httpClient: resty.New(),
metadataCache: structs.NewMap(),
ServiceBase: ServiceBase{
logger: params.Logger,
config: params.Config,
db: params.Db,
},
}
}
func (s *StorageService) Start() error {
err :=
utils.CreateBucket(cacheBucketName, s.db)
if err != nil {
return err
}
return nil
}
func (s *StorageService) Stop() error {
return nil
}
func (s *StorageService) Init() error {
return nil
}
func (n *StorageService) SetProviderStore(store storage.ProviderStore) {
n.providerStore = store
}
func (n *StorageService) ProviderStore() storage.ProviderStore {
return n.providerStore
}
func (s *StorageService) GetCachedStorageLocations(hash *encoding.Multihash, kinds []types.StorageLocationType) (map[string]storage.StorageLocation, error) {
locations := make(map[string]storage.StorageLocation)
locationMap, err := s.readStorageLocationsFromDB(hash)
if err != nil {
return nil, err
}
if len(locationMap) == 0 {
return make(map[string]storage.StorageLocation), nil
}
ts := time.Now().Unix()
for _, t := range kinds {
nodeMap, ok := (locationMap)[int(t)]
if !ok {
continue
}
for key, value := range nodeMap {
expiry, ok := value[3].(int64)
if !ok || expiry < ts {
continue
}
addressesInterface, ok := value[1].([]interface{})
if !ok {
continue
}
// Create a slice to hold the strings
addresses := make([]string, len(addressesInterface))
// Convert each element to string
for i, v := range addressesInterface {
str, ok := v.(string)
if !ok {
// Handle the error, maybe skip this element or set a default value
continue
}
addresses[i] = str
}
storageLocation := storage.NewStorageLocation(int(t), addresses, expiry)
if providerMessage, ok := value[4].([]byte); ok {
(storageLocation).SetProviderMessage(providerMessage)
}
locations[key] = storageLocation
}
}
return locations, nil
}
func (s *StorageService) readStorageLocationsFromDB(hash *encoding.Multihash) (storage.StorageLocationMap, error) {
var locationMap storage.StorageLocationMap
err := s.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte(cacheBucketName)) // Replace with your actual bucket name
if b == nil {
return fmt.Errorf("bucket %s not found", cacheBucketName)
}
bytes := b.Get(hash.FullBytes())
if bytes == nil {
// If no data found, return an empty locationMap but no error
locationMap = storage.NewStorageLocationMap()
return nil
}
return msgpack.Unmarshal(bytes, &locationMap)
})
if err != nil {
return nil, err
}
return locationMap, nil
}
func (s *StorageService) AddStorageLocation(hash *encoding.Multihash, nodeId *encoding.NodeId, location storage.StorageLocation, message []byte) error {
// Read existing storage locations
locationDb, err := s.readStorageLocationsFromDB(hash)
if err != nil {
return err
}
nodeIdStr, err := nodeId.ToString()
if err != nil {
return err
}
// Get or create the inner map for the specific type
innerMap, exists := locationDb[location.Type()]
if !exists {
innerMap = make(storage.NodeStorage, 1)
innerMap[nodeIdStr] = make(storage.NodeDetailsStorage, 1)
}
// Create location map with new data
locationMap := make(map[int]interface{}, 3)
locationMap[1] = location.Parts()
locationMap[3] = location.Expiry()
locationMap[4] = message
// Update the inner map with the new location
innerMap[nodeIdStr] = locationMap
locationDb[location.Type()] = innerMap
// Serialize the updated map and store it in the database
packedBytes, err := msgpack.Marshal(locationDb)
if err != nil {
return err
}
err = s.db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte(cacheBucketName))
return b.Put(hash.FullBytes(), packedBytes)
})
if err != nil {
return err
}
return nil
}
func (s *StorageService) DownloadBytesByHash(hash *encoding.Multihash) ([]byte, error) {
// Initialize the download URI provider
dlUriProvider := storage.NewStorageLocationProvider(storage.StorageLocationProviderParams{
Services: s.services,
Hash: hash,
LocationTypes: []types.StorageLocationType{
types.StorageLocationTypeFull,
types.StorageLocationTypeFile,
},
ServiceParams: ServiceParams{
Logger: s.logger,
Config: s.config,
Db: s.db,
},
})
err := dlUriProvider.Start()
if err != nil {
return nil, err
}
retryCount := 0
for {
dlUri, err := dlUriProvider.Next()
if err != nil {
return nil, err
}
s.logger.Debug("Trying to download from", zap.String("url", dlUri.Location().BytesURL()))
res, err := s.httpClient.R().Get(dlUri.Location().BytesURL())
if err != nil {
err := dlUriProvider.Downvote(dlUri)
if err != nil {
return nil, err
}
retryCount++
if retryCount > 32 {
return nil, errors.New("too many retries")
}
continue
}
bodyBytes := res.Body()
return bodyBytes, nil
}
}
func (s *StorageService) DownloadBytesByCID(cid *encoding.CID) (bytes []byte, err error) {
bytes, err = s.DownloadBytesByHash(&cid.Hash)
if err != nil {
return nil, err
}
return bytes, nil
}
func (s *StorageService) GetMetadataByCID(cid *encoding.CID) (md metadata.Metadata, err error) {
hashStr, err := cid.Hash.ToString()
if err != nil {
return nil, err
}
if s.metadataCache.Contains(hashStr) {
md, _ := s.metadataCache.Get(hashStr)
return md.(metadata.Metadata), nil
}
bytes, err := s.DownloadBytesByHash(&cid.Hash)
if err != nil {
return nil, err
}
switch cid.Type {
case types.CIDTypeMetadataMedia, types.CIDTypeBridge: // Both cases use the same deserialization method
md = metadata.NewEmptyMediaMetadata()
err = msgpack.Unmarshal(bytes, md)
if err != nil {
return nil, err
}
case types.CIDTypeMetadataWebapp:
md = metadata.NewEmptyWebAppMetadata()
err = msgpack.Unmarshal(bytes, md)
if err != nil {
return nil, err
}
case types.CIDTypeDirectory:
md = metadata.NewEmptyDirectoryMetadata()
err = msgpack.Unmarshal(bytes, md)
if err != nil {
return nil, err
}
default:
return nil, errors.New("unsupported metadata format")
}
s.metadataCache.Put(hashStr, md)
return md, nil
}

View File

@ -4,7 +4,7 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"git.lumeweb.com/LumeWeb/libs5-go/encoding" "git.lumeweb.com/LumeWeb/libs5-go/encoding"
_node "git.lumeweb.com/LumeWeb/libs5-go/node" "git.lumeweb.com/LumeWeb/libs5-go/service"
"git.lumeweb.com/LumeWeb/libs5-go/types" "git.lumeweb.com/LumeWeb/libs5-go/types"
"github.com/vmihailenco/msgpack/v5" "github.com/vmihailenco/msgpack/v5"
"go.uber.org/zap" "go.uber.org/zap"
@ -33,6 +33,13 @@ type StorageLocationImpl struct {
providerMessage []byte providerMessage []byte
} }
type StorageLocationProviderParams struct {
Services service.Services
Hash *encoding.Multihash
LocationTypes []types.StorageLocationType
service.ServiceParams
}
func (s *StorageLocationImpl) Type() int { func (s *StorageLocationImpl) Type() int {
return s.kind return s.kind
} }
@ -184,7 +191,7 @@ func NewStorageLocationMap() StorageLocationMap {
} }
type StorageLocationProviderImpl struct { type StorageLocationProviderImpl struct {
node *_node.Node services service.Services
hash *encoding.Multihash hash *encoding.Multihash
types []types.StorageLocationType types []types.StorageLocationType
timeoutDuration time.Duration timeoutDuration time.Duration
@ -194,12 +201,13 @@ type StorageLocationProviderImpl struct {
isTimedOut bool isTimedOut bool
isWaitingForUri bool isWaitingForUri bool
mutex sync.Mutex mutex sync.Mutex
logger *zap.Logger
} }
func (s *StorageLocationProviderImpl) Start() error { func (s *StorageLocationProviderImpl) Start() error {
var err error var err error
s.uris, err = s.node.GetCachedStorageLocations(s.hash, s.types) s.uris, err = s.services.Storage().GetCachedStorageLocations(s.hash, s.types)
if err != nil { if err != nil {
return err return err
} }
@ -212,10 +220,9 @@ func (s *StorageLocationProviderImpl) Start() error {
} }
s.availableNodes = append(s.availableNodes, nodeId) s.availableNodes = append(s.availableNodes, nodeId)
} }
s.availableNodes, err = s.node.Services().P2P().SortNodesByScore(s.availableNodes) s.availableNodes, err = s.services.P2P().SortNodesByScore(s.availableNodes)
if err != nil { if err != nil {
s.mutex.Unlock() s.mutex.Unlock()
return err return err
@ -235,18 +242,17 @@ func (s *StorageLocationProviderImpl) Start() error {
break break
} }
newUris, err := s.node.GetCachedStorageLocations(s.hash, s.types) newUris, err := s.services.Storage().GetCachedStorageLocations(s.hash, s.types)
if err != nil { if err != nil {
s.mutex.Unlock() s.mutex.Unlock()
break break
} }
//s.node.Logger().Debug("New URIs", zap.Any("uris", newUris), zap.Any("availableNodes", s.availableNodes), zap.Any("timeout", s.timeout), zap.Any("isTimedOut", s.isTimedOut), zap.Any("isWaitingForUri", s.isWaitingForUri), zap.Any("requestSent", requestSent))
if len(s.availableNodes) == 0 && len(newUris) < 2 && !requestSent { if len(s.availableNodes) == 0 && len(newUris) < 2 && !requestSent {
s.node.Logger().Debug("Sending hash request") s.logger.Debug("Sending hash request")
err := s.node.Services().P2P().SendHashRequest(s.hash, s.types) err := s.services.P2P().SendHashRequest(s.hash, s.types)
if err != nil { if err != nil {
s.node.Logger().Error("Error sending hash request", zap.Error(err)) s.logger.Error("Error sending hash request", zap.Error(err))
continue continue
} }
requestSent = true requestSent = true
@ -258,7 +264,7 @@ func (s *StorageLocationProviderImpl) Start() error {
s.uris[k] = v s.uris[k] = v
nodeId, err := encoding.DecodeNodeId(k) nodeId, err := encoding.DecodeNodeId(k)
if err != nil { if err != nil {
s.node.Logger().Error("Error decoding node id", zap.Error(err)) s.logger.Error("Error decoding node id", zap.Error(err))
continue continue
} }
if !containsNode(s.availableNodes, nodeId) { if !containsNode(s.availableNodes, nodeId) {
@ -269,9 +275,9 @@ func (s *StorageLocationProviderImpl) Start() error {
} }
if hasNewNode { if hasNewNode {
score, err := s.node.Services().P2P().SortNodesByScore(s.availableNodes) score, err := s.services.P2P().SortNodesByScore(s.availableNodes)
if err != nil { if err != nil {
s.node.Logger().Error("Error sorting nodes by score", zap.Error(err)) s.logger.Error("Error sorting nodes by score", zap.Error(err))
} else { } else {
s.availableNodes = score s.availableNodes = score
} }
@ -299,7 +305,7 @@ func (s *StorageLocationProviderImpl) Next() (SignedStorageLocation, error) {
uri, exists := s.uris[nodIdStr] uri, exists := s.uris[nodIdStr]
if !exists { if !exists {
s.node.Logger().Error("Could not find uri for node id", zap.String("nodeId", nodIdStr)) s.logger.Error("Could not find uri for node id", zap.String("nodeId", nodIdStr))
continue continue
} }
@ -320,7 +326,7 @@ func (s *StorageLocationProviderImpl) Next() (SignedStorageLocation, error) {
} }
func (s *StorageLocationProviderImpl) Upvote(uri SignedStorageLocation) error { func (s *StorageLocationProviderImpl) Upvote(uri SignedStorageLocation) error {
err := s.node.Services().P2P().UpVote(uri.NodeId()) err := s.services.P2P().UpVote(uri.NodeId())
if err != nil { if err != nil {
return err return err
} }
@ -329,26 +335,27 @@ func (s *StorageLocationProviderImpl) Upvote(uri SignedStorageLocation) error {
} }
func (s *StorageLocationProviderImpl) Downvote(uri SignedStorageLocation) error { func (s *StorageLocationProviderImpl) Downvote(uri SignedStorageLocation) error {
err := s.node.Services().P2P().DownVote(uri.NodeId()) err := s.services.P2P().DownVote(uri.NodeId())
if err != nil { if err != nil {
return err return err
} }
return nil return nil
} }
func NewStorageLocationProvider(node *_node.Node, hash *encoding.Multihash, locationTypes ...types.StorageLocationType) StorageLocationProvider { func NewStorageLocationProvider(params StorageLocationProviderParams) *StorageLocationProviderImpl {
if locationTypes == nil { if params.LocationTypes == nil {
locationTypes = []types.StorageLocationType{ params.LocationTypes = []types.StorageLocationType{
types.StorageLocationTypeFull, types.StorageLocationTypeFull,
} }
} }
return &StorageLocationProviderImpl{ return &StorageLocationProviderImpl{
node: node, services: params.Services,
hash: hash, hash: params.Hash,
types: locationTypes, types: params.LocationTypes,
timeoutDuration: 60 * time.Second, timeoutDuration: 60 * time.Second,
uris: make(map[string]StorageLocation), uris: make(map[string]StorageLocation),
logger: params.Logger,
} }
} }
func containsNode(slice []*encoding.NodeId, item *encoding.NodeId) bool { func containsNode(slice []*encoding.NodeId, item *encoding.NodeId) bool {