refactoring: more refactoring to break import cycles

This commit is contained in:
Derrick Hammer 2024-01-29 21:32:13 -05:00
parent 0ee96599f1
commit bd08d75da4
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
13 changed files with 1449 additions and 1440 deletions

View File

@ -3,6 +3,7 @@ package fx
import ( import (
"git.lumeweb.com/LumeWeb/libs5-go/node" "git.lumeweb.com/LumeWeb/libs5-go/node"
"git.lumeweb.com/LumeWeb/libs5-go/service" "git.lumeweb.com/LumeWeb/libs5-go/service"
_default "git.lumeweb.com/LumeWeb/libs5-go/service/default"
"go.uber.org/fx" "go.uber.org/fx"
) )
@ -25,19 +26,19 @@ type ServicesParams struct {
node.ServicesParams node.ServicesParams
} }
func newP2P(params ServiceParams) *service.P2PService { func newP2P(params ServiceParams) service.P2PService {
return service.NewP2P(params.ServiceParams) return _default.NewP2P(params.ServiceParams)
} }
func newRegistry(params ServiceParams) *service.RegistryService { func newRegistry(params ServiceParams) service.RegistryService {
return service.NewRegistry(params.ServiceParams) return _default.NewRegistry(params.ServiceParams)
} }
func newHTTP(params ServiceParams) *service.HTTPService { func newHTTP(params ServiceParams) service.HTTPService {
return service.NewHTTP(params.ServiceParams) return _default.NewHTTP(params.ServiceParams)
} }
func newStorage(params ServiceParams) *service.StorageService { func newStorage(params ServiceParams) service.StorageService {
return service.NewStorage(params.ServiceParams) return _default.NewStorage(params.ServiceParams)
} }
func newServices(params ServicesParams) service.Services { func newServices(params ServicesParams) service.Services {

117
service/default/http.go Normal file
View File

@ -0,0 +1,117 @@
package _default
import (
"git.lumeweb.com/LumeWeb/libs5-go/build"
"git.lumeweb.com/LumeWeb/libs5-go/net"
"git.lumeweb.com/LumeWeb/libs5-go/service"
"github.com/julienschmidt/httprouter"
"go.sia.tech/jape"
"go.uber.org/zap"
"net/url"
"nhooyr.io/websocket"
)
var _ service.Service = (*HTTPServiceDefault)(nil)
type P2PNodesResponse struct {
Nodes []P2PNodeResponse `json:"nodes"`
}
type P2PNodeResponse struct {
Id string `json:"id"`
Uris []string `json:"uris"`
}
type HTTPServiceDefault struct {
service.ServiceBase
}
func NewHTTP(params service.ServiceParams) *HTTPServiceDefault {
return &HTTPServiceDefault{
ServiceBase: service.NewServiceBase(params.Logger, params.Config, params.Db),
}
}
func (h *HTTPServiceDefault) GetHttpRouter(inject map[string]jape.Handler) *httprouter.Router {
routes := map[string]jape.Handler{
"GET /s5/version": h.versionHandler,
"GET /s5/p2p": h.p2pHandler,
"GET /s5/p2p/nodes": h.p2pNodesHandler,
}
for k, v := range inject {
routes[k] = v
}
return jape.Mux(routes)
}
func (h *HTTPServiceDefault) Start() error {
return nil
}
func (h *HTTPServiceDefault) Stop() error {
return nil
}
func (h *HTTPServiceDefault) Init() error {
return nil
}
func (h *HTTPServiceDefault) versionHandler(ctx jape.Context) {
_, _ = ctx.ResponseWriter.Write([]byte(build.Version))
}
func (h *HTTPServiceDefault) p2pHandler(ctx jape.Context) {
c, err := websocket.Accept(ctx.ResponseWriter, ctx.Request, nil)
if err != nil {
h.Logger().Error("error accepting websocket connection", zap.Error(err))
return
}
peer, err := net.CreateTransportPeer("wss", &net.TransportPeerConfig{
Socket: c,
Uris: []*url.URL{},
})
if err != nil {
h.Logger().Error("error creating transport peer", zap.Error(err))
err := c.Close(websocket.StatusInternalError, "the sky is falling")
if err != nil {
h.Logger().Error("error closing websocket connection", zap.Error(err))
}
return
}
h.Services().P2P().ConnectionTracker().Add(1)
go func() {
err := h.Services().P2P().OnNewPeer(peer, false)
if err != nil {
h.Logger().Error("error handling new peer", zap.Error(err))
}
h.Services().P2P().ConnectionTracker().Done()
}()
}
func (h *HTTPServiceDefault) p2pNodesHandler(ctx jape.Context) {
localId, err := h.Services().P2P().NodeId().ToString()
if ctx.Check("error getting local node id", err) != nil {
return
}
uris := h.Services().P2P().SelfConnectionUris()
nodeList := make([]P2PNodeResponse, len(uris))
for i, uri := range uris {
nodeList[i] = P2PNodeResponse{
Id: localId,
Uris: []string{uri.String()},
}
}
ctx.Encode(P2PNodesResponse{
Nodes: nodeList,
})
}

676
service/default/p2p.go Normal file
View File

@ -0,0 +1,676 @@
package _default
import (
"bytes"
"context"
"errors"
"fmt"
"git.lumeweb.com/LumeWeb/libs5-go/ed25519"
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
"git.lumeweb.com/LumeWeb/libs5-go/net"
"git.lumeweb.com/LumeWeb/libs5-go/protocol"
"git.lumeweb.com/LumeWeb/libs5-go/protocol/base"
"git.lumeweb.com/LumeWeb/libs5-go/protocol/signed"
"git.lumeweb.com/LumeWeb/libs5-go/service"
"git.lumeweb.com/LumeWeb/libs5-go/structs"
"git.lumeweb.com/LumeWeb/libs5-go/types"
"git.lumeweb.com/LumeWeb/libs5-go/utils"
"github.com/vmihailenco/msgpack/v5"
bolt "go.etcd.io/bbolt"
"go.uber.org/zap"
"net/url"
"sort"
"sync"
"time"
)
var _ service.P2PService = (*P2PServiceDefault)(nil)
var (
errUnsupportedProtocol = errors.New("unsupported protocol")
errConnectionIdMissingNodeID = errors.New("connection id missing node id")
)
const nodeBucketName = "nodes"
type P2PServiceDefault struct {
nodeKeyPair *ed25519.KeyPairEd25519
localNodeID *encoding.NodeId
networkID string
nodesBucket *bolt.Bucket
inited bool
reconnectDelay structs.Map
peers structs.Map
peersPending structs.Map
selfConnectionUris []*url.URL
outgoingPeerBlocklist structs.Map
incomingPeerBlockList structs.Map
incomingIPBlocklist structs.Map
outgoingPeerFailures structs.Map
maxOutgoingPeerFailures uint
connections sync.WaitGroup
hashQueryRoutingTable structs.Map
service.ServiceBase
}
func NewP2P(params service.ServiceParams) *P2PServiceDefault {
uri, err := url.Parse(fmt.Sprintf("wss://%s:%d/s5/p2p", params.Config.HTTP.API.Domain, params.Config.HTTP.API.Port))
if err != nil {
params.Logger.Fatal("failed to parse HTTP API URL", zap.Error(err))
}
service := &P2PServiceDefault{
nodeKeyPair: params.Config.KeyPair,
networkID: params.Config.P2P.Network,
inited: false,
reconnectDelay: structs.NewMap(),
peers: structs.NewMap(),
peersPending: structs.NewMap(),
selfConnectionUris: []*url.URL{uri},
outgoingPeerBlocklist: structs.NewMap(),
incomingPeerBlockList: structs.NewMap(),
incomingIPBlocklist: structs.NewMap(),
outgoingPeerFailures: structs.NewMap(),
maxOutgoingPeerFailures: params.Config.P2P.MaxOutgoingPeerFailures,
hashQueryRoutingTable: structs.NewMap(),
ServiceBase: service.NewServiceBase(params.Logger, params.Config, params.Db),
}
return service
}
func (p *P2PServiceDefault) SelfConnectionUris() []*url.URL {
return p.selfConnectionUris
}
func (p *P2PServiceDefault) Peers() structs.Map {
return p.peers
}
func (p *P2PServiceDefault) Start() error {
config := p.Config()
if len(config.P2P.Peers.Initial) > 0 {
initialPeers := config.P2P.Peers.Initial
for _, peer := range initialPeers {
u, err := url.Parse(peer)
if err != nil {
return err
}
peer := peer
go func() {
err := p.ConnectToNode([]*url.URL{u}, false, nil)
if err != nil {
p.Logger().Error("failed to connect to initial peer", zap.Error(err), zap.String("peer", peer))
}
}()
}
}
return nil
}
func (p *P2PServiceDefault) Stop() error {
return nil
}
func (p *P2PServiceDefault) Init() error {
if p.inited {
return nil
}
p.localNodeID = encoding.NewNodeId(p.nodeKeyPair.PublicKey())
err := utils.CreateBucket(nodeBucketName, p.Db())
if err != nil {
return err
}
p.inited = true
return nil
}
func (p *P2PServiceDefault) ConnectToNode(connectionUris []*url.URL, retried bool, fromPeer net.Peer) error {
if !p.Services().IsStarted() {
return nil
}
unsupported, _ := url.Parse("http://0.0.0.0")
unsupported.Scheme = "unsupported"
var connectionUri *url.URL
for _, uri := range connectionUris {
if uri.Scheme == "ws" || uri.Scheme == "wss" {
connectionUri = uri
break
}
}
if connectionUri == nil {
for _, uri := range connectionUris {
if uri.Scheme == "tcp" {
connectionUri = uri
break
}
}
}
if connectionUri == nil {
connectionUri = unsupported
}
if connectionUri.Scheme == "unsupported" {
return errUnsupportedProtocol
}
scheme := connectionUri.Scheme
if connectionUri.User == nil {
return errConnectionIdMissingNodeID
}
username := connectionUri.User.Username()
id, err := encoding.DecodeNodeId(username)
if err != nil {
return err
}
idString, err := id.ToString()
if err != nil {
return err
}
if p.peersPending.Contains(idString) || p.peers.Contains(idString) {
p.Logger().Debug("already connected", zap.String("node", connectionUri.String()))
return nil
}
if p.outgoingPeerBlocklist.Contains(idString) {
p.Logger().Debug("outgoing peer is on blocklist", zap.String("node", connectionUri.String()))
var fromPeerId string
if fromPeer != nil {
blocked := false
if fromPeer.Id() != nil {
fromPeerId, err = fromPeer.Id().ToString()
if err != nil {
return err
}
if !p.incomingPeerBlockList.Contains(fromPeerId) {
p.incomingPeerBlockList.Put(fromPeerId, true)
blocked = true
}
}
fromPeerIP := fromPeer.GetIP()
if !p.incomingIPBlocklist.Contains(fromPeerIP) {
p.incomingIPBlocklist.Put(fromPeerIP, true)
blocked = true
}
err = fromPeer.EndForAbuse()
if err != nil {
return err
}
if blocked {
p.Logger().Debug("blocking peer for sending peer on blocklist", zap.String("node", connectionUri.String()), zap.String("peer", fromPeerId), zap.String("ip", fromPeerIP))
}
}
return nil
}
reconnectDelay := p.reconnectDelay.GetUInt(idString)
if reconnectDelay == nil {
delay := uint(1)
reconnectDelay = &delay
}
if id.Equals(p.localNodeID) {
return nil
}
p.Logger().Debug("connect", zap.String("node", connectionUri.String()))
socket, err := net.CreateTransportSocket(scheme, connectionUri)
if err != nil {
if retried {
p.Logger().Error("failed to connect, too many retries", zap.String("node", connectionUri.String()), zap.Error(err))
counter := uint(0)
if p.outgoingPeerFailures.Contains(idString) {
tmp := *p.outgoingPeerFailures.GetUInt(idString)
counter = tmp
}
counter++
p.outgoingPeerFailures.PutUInt(idString, counter)
if counter >= p.maxOutgoingPeerFailures {
if fromPeer != nil {
blocked := false
var fromPeerId string
if fromPeer.Id() != nil {
fromPeerId, err = fromPeer.Id().ToString()
if err != nil {
return err
}
if !p.incomingPeerBlockList.Contains(fromPeerId) {
p.incomingPeerBlockList.Put(fromPeerId, true)
blocked = true
}
}
fromPeerIP := fromPeer.GetIP()
if !p.incomingIPBlocklist.Contains(fromPeerIP) {
p.incomingIPBlocklist.Put(fromPeerIP, true)
blocked = true
}
err = fromPeer.EndForAbuse()
if err != nil {
return err
}
if blocked {
p.Logger().Debug("blocking peer for sending peer on blocklist", zap.String("node", connectionUri.String()), zap.String("peer", fromPeerId), zap.String("ip", fromPeerIP))
}
}
p.outgoingPeerBlocklist.Put(idString, true)
p.Logger().Debug("blocking peer for too many failures", zap.String("node", connectionUri.String()))
}
return nil
}
retried = true
p.Logger().Error("failed to connect", zap.String("node", connectionUri.String()), zap.Error(err))
delay := p.reconnectDelay.GetUInt(idString)
if delay == nil {
tmp := uint(1)
delay = &tmp
}
delayDeref := *delay
p.reconnectDelay.PutUInt(idString, delayDeref*2)
time.Sleep(time.Duration(delayDeref) * time.Second)
return p.ConnectToNode(connectionUris, retried, fromPeer)
}
if p.outgoingPeerFailures.Contains(idString) {
p.outgoingPeerFailures.Remove(idString)
}
peer, err := net.CreateTransportPeer(scheme, &net.TransportPeerConfig{
Socket: socket,
Uris: []*url.URL{connectionUri},
})
if err != nil {
return err
}
peer.SetId(id)
p.Services().P2P().ConnectionTracker().Add(1)
peerId, err := peer.Id().ToString()
if err != nil {
return err
}
p.peersPending.Put(peerId, peer)
go func() {
err := p.OnNewPeer(peer, true)
if err != nil && !peer.Abuser() {
p.Logger().Error("peer error", zap.Error(err))
}
p.Services().P2P().ConnectionTracker().Done()
}()
return nil
}
func (p *P2PServiceDefault) OnNewPeer(peer net.Peer, verifyId bool) error {
var wg sync.WaitGroup
var pid string
if peer.Id() != nil {
pid, _ = peer.Id().ToString()
} else {
pid = "unknown"
}
pip := peer.GetIP()
if p.incomingIPBlocklist.Contains(pid) {
p.Logger().Error("peer is on identity blocklist", zap.String("peer", pid))
err := peer.EndForAbuse()
if err != nil {
return err
}
return nil
}
if p.incomingPeerBlockList.Contains(pip) {
p.Logger().Debug("peer is on ip blocklist", zap.String("peer", pid), zap.String("ip", pip))
err := peer.EndForAbuse()
if err != nil {
return err
}
return nil
}
p.Logger().Debug("OnNewPeer started", zap.String("peer", pid))
challenge := protocol.GenerateChallenge()
peer.SetChallenge(challenge)
wg.Add(1)
go func() {
defer wg.Done()
p.OnNewPeerListen(peer, verifyId)
}()
handshakeOpenMsg, err := msgpack.Marshal(protocol.NewHandshakeOpen(challenge, p.networkID))
if err != nil {
return err
}
err = peer.SendMessage(handshakeOpenMsg)
if err != nil {
return err
}
p.Logger().Debug("OnNewPeer sent handshake", zap.String("peer", pid))
p.Logger().Debug("OnNewPeer before Wait", zap.String("peer", pid))
wg.Wait() // Wait for OnNewPeerListen goroutine to finish
p.Logger().Debug("OnNewPeer ended", zap.String("peer", pid))
return nil
}
func (p *P2PServiceDefault) OnNewPeerListen(peer net.Peer, verifyId bool) {
onDone := net.CloseCallback(func() {
if peer.Id() != nil {
pid, err := peer.Id().ToString()
if err != nil {
p.Logger().Error("failed to get peer id", zap.Error(err))
return
}
// Handle closure of the connection
if p.peers.Contains(pid) {
p.peers.Remove(pid)
}
if p.peersPending.Contains(pid) {
p.peersPending.Remove(pid)
}
}
})
onError := net.ErrorCallback(func(args ...interface{}) {
if !peer.Abuser() {
p.Logger().Error("peer error", zap.Any("args", args))
}
})
peer.ListenForMessages(func(message []byte) error {
var reader base.IncomingMessageReader
err := msgpack.Unmarshal(message, &reader)
if err != nil {
p.Logger().Error("Error decoding basic message info", zap.Error(err))
return err
}
// Now, get the specific message handler based on the message kind
handler, ok := protocol.GetMessageType(reader.Kind)
if !ok {
p.Logger().Error("Unknown message type", zap.Int("type", reader.Kind))
return fmt.Errorf("unknown message type: %d", reader.Kind)
}
data := base.IncomingMessageData{
Original: message,
Data: reader.Data,
Ctx: context.Background(),
Peer: peer,
VerifyId: verifyId,
Config: p.Config(),
}
dec := msgpack.NewDecoder(bytes.NewReader(reader.Data))
err = handler.DecodeMessage(dec, data)
if err != nil {
p.Logger().Error("Error decoding message", zap.Error(err))
return err
}
// Directly decode and handle the specific message type
if err := handler.HandleMessage(data); err != nil {
p.Logger().Error("Error handling message", zap.Error(err))
return err
}
return nil
}, net.ListenerOptions{
OnClose: &onDone,
OnError: &onError,
Logger: p.Logger(),
})
}
func (p *P2PServiceDefault) readNodeVotes(nodeId *encoding.NodeId) (service.NodeVotes, error) {
var value []byte
var found bool
err := p.Db().View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(nodeBucketName))
if b == nil {
return fmt.Errorf("Bucket %s not found", nodeBucketName)
}
value = b.Get(nodeId.Raw())
if value == nil {
return nil
}
found = true
return nil
})
if err != nil {
return nil, err
}
if !found {
return service.NewNodeVotes(), nil
}
var score service.NodeVotes
err = msgpack.Unmarshal(value, &score)
if err != nil {
return nil, err
}
return score, nil
}
func (p *P2PServiceDefault) saveNodeVotes(nodeId *encoding.NodeId, votes service.NodeVotes) error {
// Marshal the votes into data
data, err := msgpack.Marshal(votes)
if err != nil {
return err
}
// Use a transaction to save the data
err = p.Db().Update(func(tx *bolt.Tx) error {
// Get or create the bucket
b := tx.Bucket([]byte(nodeBucketName))
// Put the data into the bucket
return b.Put(nodeId.Raw(), data)
})
return err
}
func (p *P2PServiceDefault) GetNodeScore(nodeId *encoding.NodeId) (float64, error) {
if nodeId.Equals(p.localNodeID) {
return 1, nil
}
score, err := p.readNodeVotes(nodeId)
if err != nil {
return 0.5, err
}
return protocol.CalculateNodeScore(score.Good(), score.Bad()), nil
}
func (p *P2PServiceDefault) SortNodesByScore(nodes []*encoding.NodeId) ([]*encoding.NodeId, error) {
scores := make(map[encoding.NodeIdCode]float64)
var errOccurred error
for _, nodeId := range nodes {
score, err := p.GetNodeScore(nodeId)
if err != nil {
errOccurred = err
scores[nodeId.HashCode()] = 0 // You may choose a different default value for error cases
} else {
scores[nodeId.HashCode()] = score
}
}
sort.Slice(nodes, func(i, j int) bool {
return scores[nodes[i].HashCode()] > scores[nodes[j].HashCode()]
})
return nodes, errOccurred
}
func (p *P2PServiceDefault) SignMessageSimple(message []byte) ([]byte, error) {
signedMessage := signed.NewSignedMessageRequest(message)
signedMessage.SetNodeId(p.localNodeID)
err := signedMessage.Sign(p.Config())
if err != nil {
return nil, err
}
result, err := msgpack.Marshal(signedMessage)
if err != nil {
return nil, err
}
return result, nil
}
func (p *P2PServiceDefault) AddPeer(peer net.Peer) error {
peerId, err := peer.Id().ToString()
if err != nil {
return err
}
p.peers.Put(peerId, peer)
p.reconnectDelay.PutUInt(peerId, 1)
if p.peersPending.Contains(peerId) {
p.peersPending.Remove(peerId)
}
return nil
}
func (p *P2PServiceDefault) SendPublicPeersToPeer(peer net.Peer, peersToSend []net.Peer) error {
announceRequest := signed.NewAnnounceRequest(peer, peersToSend)
message, err := msgpack.Marshal(announceRequest)
if err != nil {
return err
}
signedMessage, err := p.SignMessageSimple(message)
if err != nil {
return err
}
err = peer.SendMessage(signedMessage)
return nil
}
func (p *P2PServiceDefault) SendHashRequest(hash *encoding.Multihash, kinds []types.StorageLocationType) error {
hashRequest := protocol.NewHashRequest(hash, kinds)
message, err := msgpack.Marshal(hashRequest)
if err != nil {
return err
}
for _, peer := range p.peers.Values() {
peerValue, ok := peer.(net.Peer)
if !ok {
p.Logger().Error("failed to cast peer to net.Peer")
continue
}
err = peerValue.SendMessage(message)
}
return nil
}
func (p *P2PServiceDefault) UpVote(nodeId *encoding.NodeId) error {
err := p.vote(nodeId, true)
if err != nil {
return err
}
return nil
}
func (p *P2PServiceDefault) DownVote(nodeId *encoding.NodeId) error {
err := p.vote(nodeId, false)
if err != nil {
return err
}
return nil
}
func (p *P2PServiceDefault) vote(nodeId *encoding.NodeId, upvote bool) error {
votes, err := p.readNodeVotes(nodeId)
if err != nil {
return err
}
if upvote {
votes.Upvote()
} else {
votes.Downvote()
}
err = p.saveNodeVotes(nodeId, votes)
if err != nil {
return err
}
return nil
}
func (p *P2PServiceDefault) NodeId() *encoding.NodeId {
return p.localNodeID
}
func (p *P2PServiceDefault) WaitOnConnectedPeers() {
p.connections.Wait()
}
func (p *P2PServiceDefault) ConnectionTracker() *sync.WaitGroup {
return &p.connections
}
func (p *P2PServiceDefault) NetworkId() string {
return p.Config().P2P.Network
}
func (n *P2PServiceDefault) HashQueryRoutingTable() structs.Map {
return n.hashQueryRoutingTable
}

311
service/default/registry.go Normal file
View File

@ -0,0 +1,311 @@
package _default
import (
"errors"
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
"git.lumeweb.com/LumeWeb/libs5-go/net"
"git.lumeweb.com/LumeWeb/libs5-go/protocol"
"git.lumeweb.com/LumeWeb/libs5-go/service"
"git.lumeweb.com/LumeWeb/libs5-go/structs"
"git.lumeweb.com/LumeWeb/libs5-go/types"
"git.lumeweb.com/LumeWeb/libs5-go/utils"
"github.com/olebedev/emitter"
"github.com/vmihailenco/msgpack/v5"
"go.etcd.io/bbolt"
"go.uber.org/zap"
"time"
)
const registryBucketName = "registry"
var (
_ service.Service = (*RegistryServiceDefault)(nil)
_ service.RegistryService = (*RegistryServiceDefault)(nil)
)
type RegistryServiceDefault struct {
streams structs.Map
subs structs.Map
service.ServiceBase
}
func (r *RegistryServiceDefault) Start() error {
return nil
}
func (r *RegistryServiceDefault) Stop() error {
return nil
}
func (r *RegistryServiceDefault) Init() error {
return utils.CreateBucket(registryBucketName, r.Db())
}
func NewRegistry(params service.ServiceParams) *RegistryServiceDefault {
return &RegistryServiceDefault{
streams: structs.NewMap(),
subs: structs.NewMap(),
ServiceBase: service.NewServiceBase(params.Logger, params.Config, params.Db),
}
}
func (r *RegistryServiceDefault) Set(sre protocol.SignedRegistryEntry, trusted bool, receivedFrom net.Peer) error {
hash := encoding.NewMultihash(sre.PK())
hashString, err := hash.ToString()
if err != nil {
return err
}
pid, err := receivedFrom.Id().ToString()
if err != nil {
return err
}
r.Logger().Debug("[registry] set", zap.String("pk", hashString), zap.Uint64("revision", sre.Revision()), zap.String("receivedFrom", pid))
if !trusted {
if len(sre.PK()) != 33 {
return errors.New("Invalid pubkey")
}
if int(sre.PK()[0]) != int(types.HashTypeEd25519) {
return errors.New("Only ed25519 keys are supported")
}
if sre.Revision() < 0 || sre.Revision() > 281474976710656 {
return errors.New("Invalid revision")
}
if len(sre.Data()) > types.RegistryMaxDataSize {
return errors.New("Data too long")
}
if !sre.Verify() {
return errors.New("Invalid signature found")
}
}
existingEntry, err := r.getFromDB(sre.PK())
if err != nil {
return err
}
if existingEntry != nil {
if receivedFrom != nil {
if existingEntry.Revision() == sre.Revision() {
return nil
} else if existingEntry.Revision() > sre.Revision() {
updateMessage := protocol.MarshalSignedRegistryEntry(existingEntry)
err := receivedFrom.SendMessage(updateMessage)
if err != nil {
return err
}
return nil
}
}
if existingEntry.Revision() >= sre.Revision() {
return errors.New("Revision number too low")
}
}
key := encoding.NewMultihash(sre.PK())
keyString, err := key.ToString()
if err != nil {
return err
}
eventObj, ok := r.streams.Get(keyString)
if ok {
event := eventObj.(*emitter.Emitter)
go event.Emit("fire", sre)
}
err = r.Db().Update(func(txn *bbolt.Tx) error {
bucket := txn.Bucket([]byte(registryBucketName))
err := bucket.Put(sre.PK(), protocol.MarshalSignedRegistryEntry(sre))
if err != nil {
return err
}
return nil
})
if err != nil {
return err
}
err = r.BroadcastEntry(sre, receivedFrom)
if err != nil {
return err
}
return nil
}
func (r *RegistryServiceDefault) BroadcastEntry(sre protocol.SignedRegistryEntry, receivedFrom net.Peer) error {
hash := encoding.NewMultihash(sre.PK())
hashString, err := hash.ToString()
if err != nil {
return err
}
pid, err := receivedFrom.Id().ToString()
if err != nil {
return err
}
r.Logger().Debug("[registry] broadcastEntry", zap.String("pk", hashString), zap.Uint64("revision", sre.Revision()), zap.String("receivedFrom", pid))
updateMessage := protocol.MarshalSignedRegistryEntry(sre)
for _, p := range r.Services().P2P().Peers().Values() {
peer, ok := p.(net.Peer)
if !ok {
continue
}
if receivedFrom == nil || peer.Id().Equals(receivedFrom.Id()) {
err := peer.SendMessage(updateMessage)
if err != nil {
pid, err := peer.Id().ToString()
if err != nil {
return err
}
r.Logger().Error("Failed to send registry broadcast", zap.String("peer", pid), zap.Error(err))
return err
}
}
}
return nil
}
func (r *RegistryServiceDefault) SendRegistryRequest(pk []byte) error {
query := protocol.NewRegistryQuery(pk)
request, err := msgpack.Marshal(query)
if err != nil {
return err
}
// Iterate over peers and send the request
for _, peerVal := range r.Services().P2P().Peers().Values() {
peer, ok := peerVal.(net.Peer)
if !ok {
continue
}
err := peer.SendMessage(request)
if err != nil {
pid, err := peer.Id().ToString()
if err != nil {
return err
}
r.Logger().Error("Failed to send registry request", zap.String("peer", pid), zap.Error(err))
return err
}
}
return nil
}
func (r *RegistryServiceDefault) Get(pk []byte) (protocol.SignedRegistryEntry, error) {
key := encoding.NewMultihash(pk)
keyString, err := key.ToString()
if err != nil {
return nil, err
}
if r.subs.Contains(keyString) {
r.Logger().Debug("[registry] get (cached)", zap.String("key", keyString))
res, err := r.getFromDB(pk)
if err != nil {
return nil, err
}
if res != nil {
return res, nil
}
err = r.SendRegistryRequest(pk)
if err != nil {
return nil, err
}
time.Sleep(200 * time.Millisecond)
return r.getFromDB(pk)
}
err = r.SendRegistryRequest(pk)
if err != nil {
return nil, err
}
r.subs.Put(keyString, key)
if !r.streams.Contains(keyString) {
event := &emitter.Emitter{}
r.streams.Put(keyString, event)
}
res, err := r.getFromDB(pk)
if err != nil {
return nil, err
}
if res != nil {
return res, nil
}
if res == nil {
r.Logger().Debug("[registry] get (cached)", zap.String("key", keyString))
for i := 0; i < 200; i++ {
time.Sleep(10 * time.Millisecond)
res, err := r.getFromDB(pk)
if err != nil {
return nil, err
}
if res != nil {
break
}
}
}
return nil, nil
}
func (r *RegistryServiceDefault) Listen(pk []byte, cb func(sre protocol.SignedRegistryEntry)) (func(), error) {
key, err := encoding.NewMultihash(pk).ToString()
if err != nil {
return nil, err
}
cbProxy := func(event *emitter.Event) {
sre, ok := event.Args[0].(protocol.SignedRegistryEntry)
if !ok {
r.Logger().Error("Failed to cast event to SignedRegistryEntry")
return
}
cb(sre)
}
if !r.streams.Contains(key) {
em := emitter.New(0)
r.streams.Put(key, em)
err := r.SendRegistryRequest(pk)
if err != nil {
return nil, err
}
}
streamVal, _ := r.streams.Get(key)
stream := streamVal.(*emitter.Emitter)
channel := stream.On("fire", cbProxy)
return func() {
stream.Off("fire", channel)
}, nil
}
func (r *RegistryServiceDefault) getFromDB(pk []byte) (sre protocol.SignedRegistryEntry, err error) {
err = r.Db().View(func(txn *bbolt.Tx) error {
bucket := txn.Bucket([]byte(registryBucketName))
val := bucket.Get(pk)
if val != nil {
entry, err := protocol.UnmarshalSignedRegistryEntry(val)
if err != nil {
return err
}
sre = entry
return nil
}
return nil
})
if err != nil {
return nil, err
}
return sre, nil
}

View File

@ -0,0 +1 @@
package _default

297
service/default/storage.go Normal file
View File

@ -0,0 +1,297 @@
package _default
import (
"errors"
"fmt"
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
"git.lumeweb.com/LumeWeb/libs5-go/metadata"
"git.lumeweb.com/LumeWeb/libs5-go/service"
"git.lumeweb.com/LumeWeb/libs5-go/storage"
"git.lumeweb.com/LumeWeb/libs5-go/storage/provider"
"git.lumeweb.com/LumeWeb/libs5-go/structs"
"git.lumeweb.com/LumeWeb/libs5-go/types"
"git.lumeweb.com/LumeWeb/libs5-go/utils"
"github.com/go-resty/resty/v2"
"github.com/vmihailenco/msgpack/v5"
"go.etcd.io/bbolt"
"go.uber.org/zap"
"time"
)
const cacheBucketName = "object-cache"
var (
_ service.Service = (*StorageService)(nil)
_ service.StorageService = (*StorageService)(nil)
)
type StorageService struct {
httpClient *resty.Client
metadataCache structs.Map
providerStore storage.ProviderStore
service.ServiceBase
}
func NewStorage(params service.ServiceParams) *StorageService {
return &StorageService{
httpClient: resty.New(),
metadataCache: structs.NewMap(),
ServiceBase: service.NewServiceBase(params.Logger, params.Config, params.Db),
}
}
func (s *StorageService) Start() error {
err :=
utils.CreateBucket(cacheBucketName, s.Db())
if err != nil {
return err
}
return nil
}
func (s *StorageService) Stop() error {
return nil
}
func (s *StorageService) Init() error {
return nil
}
func (n *StorageService) SetProviderStore(store storage.ProviderStore) {
n.providerStore = store
}
func (n *StorageService) ProviderStore() storage.ProviderStore {
return n.providerStore
}
func (s *StorageService) GetCachedStorageLocations(hash *encoding.Multihash, kinds []types.StorageLocationType) (map[string]storage.StorageLocation, error) {
locations := make(map[string]storage.StorageLocation)
locationMap, err := s.readStorageLocationsFromDB(hash)
if err != nil {
return nil, err
}
if len(locationMap) == 0 {
return make(map[string]storage.StorageLocation), nil
}
ts := time.Now().Unix()
for _, t := range kinds {
nodeMap, ok := (locationMap)[int(t)]
if !ok {
continue
}
for key, value := range nodeMap {
expiry, ok := value[3].(int64)
if !ok || expiry < ts {
continue
}
addressesInterface, ok := value[1].([]interface{})
if !ok {
continue
}
// Create a slice to hold the strings
addresses := make([]string, len(addressesInterface))
// Convert each element to string
for i, v := range addressesInterface {
str, ok := v.(string)
if !ok {
// Handle the error, maybe skip this element or set a default value
continue
}
addresses[i] = str
}
storageLocation := storage.NewStorageLocation(int(t), addresses, expiry)
if providerMessage, ok := value[4].([]byte); ok {
(storageLocation).SetProviderMessage(providerMessage)
}
locations[key] = storageLocation
}
}
return locations, nil
}
func (s *StorageService) readStorageLocationsFromDB(hash *encoding.Multihash) (storage.StorageLocationMap, error) {
var locationMap storage.StorageLocationMap
err := s.Db().View(func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte(cacheBucketName)) // Replace with your actual bucket name
if b == nil {
return fmt.Errorf("bucket %s not found", cacheBucketName)
}
bytes := b.Get(hash.FullBytes())
if bytes == nil {
// If no data found, return an empty locationMap but no error
locationMap = storage.NewStorageLocationMap()
return nil
}
return msgpack.Unmarshal(bytes, &locationMap)
})
if err != nil {
return nil, err
}
return locationMap, nil
}
func (s *StorageService) AddStorageLocation(hash *encoding.Multihash, nodeId *encoding.NodeId, location storage.StorageLocation, message []byte) error {
// Read existing storage locations
locationDb, err := s.readStorageLocationsFromDB(hash)
if err != nil {
return err
}
nodeIdStr, err := nodeId.ToString()
if err != nil {
return err
}
// Get or create the inner map for the specific type
innerMap, exists := locationDb[location.Type()]
if !exists {
innerMap = make(storage.NodeStorage, 1)
innerMap[nodeIdStr] = make(storage.NodeDetailsStorage, 1)
}
// Create location map with new data
locationMap := make(map[int]interface{}, 3)
locationMap[1] = location.Parts()
locationMap[3] = location.Expiry()
locationMap[4] = message
// Update the inner map with the new location
innerMap[nodeIdStr] = locationMap
locationDb[location.Type()] = innerMap
// Serialize the updated map and store it in the database
packedBytes, err := msgpack.Marshal(locationDb)
if err != nil {
return err
}
err = s.Db().Update(func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte(cacheBucketName))
return b.Put(hash.FullBytes(), packedBytes)
})
if err != nil {
return err
}
return nil
}
func (s *StorageService) DownloadBytesByHash(hash *encoding.Multihash) ([]byte, error) {
// Initialize the download URI provider
dlUriProvider := provider.NewStorageLocationProvider(provider.StorageLocationProviderParams{
Services: s.Services(),
Hash: hash,
LocationTypes: []types.StorageLocationType{
types.StorageLocationTypeFull,
types.StorageLocationTypeFile,
},
Logger: s.Logger(),
Config: s.Config(),
Db: s.Db(),
})
err := dlUriProvider.Start()
if err != nil {
return nil, err
}
retryCount := 0
for {
dlUri, err := dlUriProvider.Next()
if err != nil {
return nil, err
}
s.Logger().Debug("Trying to download from", zap.String("url", dlUri.Location().BytesURL()))
res, err := s.httpClient.R().Get(dlUri.Location().BytesURL())
if err != nil {
err := dlUriProvider.Downvote(dlUri)
if err != nil {
return nil, err
}
retryCount++
if retryCount > 32 {
return nil, errors.New("too many retries")
}
continue
}
bodyBytes := res.Body()
return bodyBytes, nil
}
}
func (s *StorageService) DownloadBytesByCID(cid *encoding.CID) (bytes []byte, err error) {
bytes, err = s.DownloadBytesByHash(&cid.Hash)
if err != nil {
return nil, err
}
return bytes, nil
}
func (s *StorageService) GetMetadataByCID(cid *encoding.CID) (md metadata.Metadata, err error) {
hashStr, err := cid.Hash.ToString()
if err != nil {
return nil, err
}
if s.metadataCache.Contains(hashStr) {
md, _ := s.metadataCache.Get(hashStr)
return md.(metadata.Metadata), nil
}
bytes, err := s.DownloadBytesByHash(&cid.Hash)
if err != nil {
return nil, err
}
switch cid.Type {
case types.CIDTypeMetadataMedia, types.CIDTypeBridge: // Both cases use the same deserialization method
md = metadata.NewEmptyMediaMetadata()
err = msgpack.Unmarshal(bytes, md)
if err != nil {
return nil, err
}
case types.CIDTypeMetadataWebapp:
md = metadata.NewEmptyWebAppMetadata()
err = msgpack.Unmarshal(bytes, md)
if err != nil {
return nil, err
}
case types.CIDTypeDirectory:
md = metadata.NewEmptyDirectoryMetadata()
err = msgpack.Unmarshal(bytes, md)
if err != nil {
return nil, err
}
default:
return nil, errors.New("unsupported metadata format")
}
s.metadataCache.Put(hashStr, md)
return md, nil
}

View File

@ -1,124 +1,11 @@
package service package service
import ( import (
"git.lumeweb.com/LumeWeb/libs5-go/build"
"git.lumeweb.com/LumeWeb/libs5-go/net"
"github.com/julienschmidt/httprouter" "github.com/julienschmidt/httprouter"
"go.sia.tech/jape" "go.sia.tech/jape"
"go.uber.org/zap"
"net/url"
"nhooyr.io/websocket"
) )
var _ Service = (*HTTPService)(nil) type HTTPService interface {
type P2PNodesResponse struct {
Nodes []P2PNodeResponse `json:"nodes"`
}
type P2PNodeResponse struct {
Id string `json:"id"`
Uris []string `json:"uris"`
}
type HTTPServiceInterface interface {
GetHttpRouter(inject map[string]jape.Handler) *httprouter.Router GetHttpRouter(inject map[string]jape.Handler) *httprouter.Router
} Service
type HTTPService struct {
ServiceBase
}
func NewHTTP(params ServiceParams) *HTTPService {
return &HTTPService{
ServiceBase: ServiceBase{
logger: params.Logger,
config: params.Config,
db: params.Db,
},
}
}
func (h *HTTPService) GetHttpRouter(inject map[string]jape.Handler) *httprouter.Router {
routes := map[string]jape.Handler{
"GET /s5/version": h.versionHandler,
"GET /s5/p2p": h.p2pHandler,
"GET /s5/p2p/nodes": h.p2pNodesHandler,
}
for k, v := range inject {
routes[k] = v
}
return jape.Mux(routes)
}
func (h *HTTPService) Start() error {
return nil
}
func (h *HTTPService) Stop() error {
return nil
}
func (h *HTTPService) Init() error {
return nil
}
func (h *HTTPService) versionHandler(ctx jape.Context) {
_, _ = ctx.ResponseWriter.Write([]byte(build.Version))
}
func (h *HTTPService) p2pHandler(ctx jape.Context) {
c, err := websocket.Accept(ctx.ResponseWriter, ctx.Request, nil)
if err != nil {
h.logger.Error("error accepting websocket connection", zap.Error(err))
return
}
peer, err := net.CreateTransportPeer("wss", &net.TransportPeerConfig{
Socket: c,
Uris: []*url.URL{},
})
if err != nil {
h.logger.Error("error creating transport peer", zap.Error(err))
err := c.Close(websocket.StatusInternalError, "the sky is falling")
if err != nil {
h.logger.Error("error closing websocket connection", zap.Error(err))
}
return
}
h.services.P2P().ConnectionTracker().Add(1)
go func() {
err := h.services.P2P().OnNewPeer(peer, false)
if err != nil {
h.logger.Error("error handling new peer", zap.Error(err))
}
h.services.P2P().ConnectionTracker().Done()
}()
}
func (h *HTTPService) p2pNodesHandler(ctx jape.Context) {
localId, err := h.services.P2P().NodeId().ToString()
if ctx.Check("error getting local node id", err) != nil {
return
}
uris := h.services.P2P().SelfConnectionUris()
nodeList := make([]P2PNodeResponse, len(uris))
for i, uri := range uris {
nodeList[i] = P2PNodeResponse{
Id: localId,
Uris: []string{uri.String()},
}
}
ctx.Encode(P2PNodesResponse{
Nodes: nodeList,
})
} }

View File

@ -1,41 +1,15 @@
package service package service
import ( import (
"bytes"
"context"
"errors"
"fmt"
"git.lumeweb.com/LumeWeb/libs5-go/ed25519"
"git.lumeweb.com/LumeWeb/libs5-go/encoding" "git.lumeweb.com/LumeWeb/libs5-go/encoding"
"git.lumeweb.com/LumeWeb/libs5-go/net" "git.lumeweb.com/LumeWeb/libs5-go/net"
"git.lumeweb.com/LumeWeb/libs5-go/protocol"
"git.lumeweb.com/LumeWeb/libs5-go/protocol/base"
"git.lumeweb.com/LumeWeb/libs5-go/protocol/signed"
"git.lumeweb.com/LumeWeb/libs5-go/storage"
"git.lumeweb.com/LumeWeb/libs5-go/structs" "git.lumeweb.com/LumeWeb/libs5-go/structs"
"git.lumeweb.com/LumeWeb/libs5-go/types" "git.lumeweb.com/LumeWeb/libs5-go/types"
"git.lumeweb.com/LumeWeb/libs5-go/utils"
"github.com/vmihailenco/msgpack/v5"
bolt "go.etcd.io/bbolt"
"go.uber.org/zap"
"net/url" "net/url"
"sort"
"sync" "sync"
"time"
) )
var _ Service = (*P2PService)(nil) type P2PService interface {
var _ storage.StorageLocationProviderP2PService = (*P2PService)(nil)
var _ P2PServiceInterface = (*P2PService)(nil)
var (
errUnsupportedProtocol = errors.New("unsupported protocol")
errConnectionIdMissingNodeID = errors.New("connection id missing node id")
)
const nodeBucketName = "nodes"
type P2PServiceInterface interface {
SelfConnectionUris() []*url.URL SelfConnectionUris() []*url.URL
Peers() structs.Map Peers() structs.Map
ConnectToNode(connectionUris []*url.URL, retried bool, fromPeer net.Peer) error ConnectToNode(connectionUris []*url.URL, retried bool, fromPeer net.Peer) error
@ -53,650 +27,5 @@ type P2PServiceInterface interface {
ConnectionTracker() *sync.WaitGroup ConnectionTracker() *sync.WaitGroup
NetworkId() string NetworkId() string
HashQueryRoutingTable() structs.Map HashQueryRoutingTable() structs.Map
} Service
type P2PService struct {
nodeKeyPair *ed25519.KeyPairEd25519
localNodeID *encoding.NodeId
networkID string
nodesBucket *bolt.Bucket
inited bool
reconnectDelay structs.Map
peers structs.Map
peersPending structs.Map
selfConnectionUris []*url.URL
outgoingPeerBlocklist structs.Map
incomingPeerBlockList structs.Map
incomingIPBlocklist structs.Map
outgoingPeerFailures structs.Map
maxOutgoingPeerFailures uint
connections sync.WaitGroup
hashQueryRoutingTable structs.Map
ServiceBase
}
func NewP2P(params ServiceParams) *P2PService {
uri, err := url.Parse(fmt.Sprintf("wss://%s:%d/s5/p2p", params.Config.HTTP.API.Domain, params.Config.HTTP.API.Port))
if err != nil {
params.Logger.Fatal("failed to parse HTTP API URL", zap.Error(err))
}
service := &P2PService{
nodeKeyPair: params.Config.KeyPair,
networkID: params.Config.P2P.Network,
inited: false,
reconnectDelay: structs.NewMap(),
peers: structs.NewMap(),
peersPending: structs.NewMap(),
selfConnectionUris: []*url.URL{uri},
outgoingPeerBlocklist: structs.NewMap(),
incomingPeerBlockList: structs.NewMap(),
incomingIPBlocklist: structs.NewMap(),
outgoingPeerFailures: structs.NewMap(),
maxOutgoingPeerFailures: params.Config.P2P.MaxOutgoingPeerFailures,
hashQueryRoutingTable: structs.NewMap(),
ServiceBase: ServiceBase{
logger: params.Logger,
config: params.Config,
db: params.Db,
},
}
return service
}
func (p *P2PService) SelfConnectionUris() []*url.URL {
return p.selfConnectionUris
}
func (p *P2PService) Peers() structs.Map {
return p.peers
}
func (p *P2PService) Start() error {
config := p.config
if len(config.P2P.Peers.Initial) > 0 {
initialPeers := config.P2P.Peers.Initial
for _, peer := range initialPeers {
u, err := url.Parse(peer)
if err != nil {
return err
}
peer := peer
go func() {
err := p.ConnectToNode([]*url.URL{u}, false, nil)
if err != nil {
p.logger.Error("failed to connect to initial peer", zap.Error(err), zap.String("peer", peer))
}
}()
}
}
return nil
}
func (p *P2PService) Stop() error {
return nil
}
func (p *P2PService) Init() error {
if p.inited {
return nil
}
p.localNodeID = encoding.NewNodeId(p.nodeKeyPair.PublicKey())
err := utils.CreateBucket(nodeBucketName, p.db)
if err != nil {
return err
}
p.inited = true
return nil
}
func (p *P2PService) ConnectToNode(connectionUris []*url.URL, retried bool, fromPeer net.Peer) error {
if !p.services.IsStarted() {
return nil
}
unsupported, _ := url.Parse("http://0.0.0.0")
unsupported.Scheme = "unsupported"
var connectionUri *url.URL
for _, uri := range connectionUris {
if uri.Scheme == "ws" || uri.Scheme == "wss" {
connectionUri = uri
break
}
}
if connectionUri == nil {
for _, uri := range connectionUris {
if uri.Scheme == "tcp" {
connectionUri = uri
break
}
}
}
if connectionUri == nil {
connectionUri = unsupported
}
if connectionUri.Scheme == "unsupported" {
return errUnsupportedProtocol
}
scheme := connectionUri.Scheme
if connectionUri.User == nil {
return errConnectionIdMissingNodeID
}
username := connectionUri.User.Username()
id, err := encoding.DecodeNodeId(username)
if err != nil {
return err
}
idString, err := id.ToString()
if err != nil {
return err
}
if p.peersPending.Contains(idString) || p.peers.Contains(idString) {
p.logger.Debug("already connected", zap.String("node", connectionUri.String()))
return nil
}
if p.outgoingPeerBlocklist.Contains(idString) {
p.logger.Debug("outgoing peer is on blocklist", zap.String("node", connectionUri.String()))
var fromPeerId string
if fromPeer != nil {
blocked := false
if fromPeer.Id() != nil {
fromPeerId, err = fromPeer.Id().ToString()
if err != nil {
return err
}
if !p.incomingPeerBlockList.Contains(fromPeerId) {
p.incomingPeerBlockList.Put(fromPeerId, true)
blocked = true
}
}
fromPeerIP := fromPeer.GetIP()
if !p.incomingIPBlocklist.Contains(fromPeerIP) {
p.incomingIPBlocklist.Put(fromPeerIP, true)
blocked = true
}
err = fromPeer.EndForAbuse()
if err != nil {
return err
}
if blocked {
p.logger.Debug("blocking peer for sending peer on blocklist", zap.String("node", connectionUri.String()), zap.String("peer", fromPeerId), zap.String("ip", fromPeerIP))
}
}
return nil
}
reconnectDelay := p.reconnectDelay.GetUInt(idString)
if reconnectDelay == nil {
delay := uint(1)
reconnectDelay = &delay
}
if id.Equals(p.localNodeID) {
return nil
}
p.logger.Debug("connect", zap.String("node", connectionUri.String()))
socket, err := net.CreateTransportSocket(scheme, connectionUri)
if err != nil {
if retried {
p.logger.Error("failed to connect, too many retries", zap.String("node", connectionUri.String()), zap.Error(err))
counter := uint(0)
if p.outgoingPeerFailures.Contains(idString) {
tmp := *p.outgoingPeerFailures.GetUInt(idString)
counter = tmp
}
counter++
p.outgoingPeerFailures.PutUInt(idString, counter)
if counter >= p.maxOutgoingPeerFailures {
if fromPeer != nil {
blocked := false
var fromPeerId string
if fromPeer.Id() != nil {
fromPeerId, err = fromPeer.Id().ToString()
if err != nil {
return err
}
if !p.incomingPeerBlockList.Contains(fromPeerId) {
p.incomingPeerBlockList.Put(fromPeerId, true)
blocked = true
}
}
fromPeerIP := fromPeer.GetIP()
if !p.incomingIPBlocklist.Contains(fromPeerIP) {
p.incomingIPBlocklist.Put(fromPeerIP, true)
blocked = true
}
err = fromPeer.EndForAbuse()
if err != nil {
return err
}
if blocked {
p.logger.Debug("blocking peer for sending peer on blocklist", zap.String("node", connectionUri.String()), zap.String("peer", fromPeerId), zap.String("ip", fromPeerIP))
}
}
p.outgoingPeerBlocklist.Put(idString, true)
p.logger.Debug("blocking peer for too many failures", zap.String("node", connectionUri.String()))
}
return nil
}
retried = true
p.logger.Error("failed to connect", zap.String("node", connectionUri.String()), zap.Error(err))
delay := p.reconnectDelay.GetUInt(idString)
if delay == nil {
tmp := uint(1)
delay = &tmp
}
delayDeref := *delay
p.reconnectDelay.PutUInt(idString, delayDeref*2)
time.Sleep(time.Duration(delayDeref) * time.Second)
return p.ConnectToNode(connectionUris, retried, fromPeer)
}
if p.outgoingPeerFailures.Contains(idString) {
p.outgoingPeerFailures.Remove(idString)
}
peer, err := net.CreateTransportPeer(scheme, &net.TransportPeerConfig{
Socket: socket,
Uris: []*url.URL{connectionUri},
})
if err != nil {
return err
}
peer.SetId(id)
p.services.P2P().ConnectionTracker().Add(1)
peerId, err := peer.Id().ToString()
if err != nil {
return err
}
p.peersPending.Put(peerId, peer)
go func() {
err := p.OnNewPeer(peer, true)
if err != nil && !peer.Abuser() {
p.logger.Error("peer error", zap.Error(err))
}
p.services.P2P().ConnectionTracker().Done()
}()
return nil
}
func (p *P2PService) OnNewPeer(peer net.Peer, verifyId bool) error {
var wg sync.WaitGroup
var pid string
if peer.Id() != nil {
pid, _ = peer.Id().ToString()
} else {
pid = "unknown"
}
pip := peer.GetIP()
if p.incomingIPBlocklist.Contains(pid) {
p.logger.Error("peer is on identity blocklist", zap.String("peer", pid))
err := peer.EndForAbuse()
if err != nil {
return err
}
return nil
}
if p.incomingPeerBlockList.Contains(pip) {
p.logger.Debug("peer is on ip blocklist", zap.String("peer", pid), zap.String("ip", pip))
err := peer.EndForAbuse()
if err != nil {
return err
}
return nil
}
p.logger.Debug("OnNewPeer started", zap.String("peer", pid))
challenge := protocol.GenerateChallenge()
peer.SetChallenge(challenge)
wg.Add(1)
go func() {
defer wg.Done()
p.OnNewPeerListen(peer, verifyId)
}()
handshakeOpenMsg, err := msgpack.Marshal(protocol.NewHandshakeOpen(challenge, p.networkID))
if err != nil {
return err
}
err = peer.SendMessage(handshakeOpenMsg)
if err != nil {
return err
}
p.logger.Debug("OnNewPeer sent handshake", zap.String("peer", pid))
p.logger.Debug("OnNewPeer before Wait", zap.String("peer", pid))
wg.Wait() // Wait for OnNewPeerListen goroutine to finish
p.logger.Debug("OnNewPeer ended", zap.String("peer", pid))
return nil
}
func (p *P2PService) OnNewPeerListen(peer net.Peer, verifyId bool) {
onDone := net.CloseCallback(func() {
if peer.Id() != nil {
pid, err := peer.Id().ToString()
if err != nil {
p.logger.Error("failed to get peer id", zap.Error(err))
return
}
// Handle closure of the connection
if p.peers.Contains(pid) {
p.peers.Remove(pid)
}
if p.peersPending.Contains(pid) {
p.peersPending.Remove(pid)
}
}
})
onError := net.ErrorCallback(func(args ...interface{}) {
if !peer.Abuser() {
p.logger.Error("peer error", zap.Any("args", args))
}
})
peer.ListenForMessages(func(message []byte) error {
var reader base.IncomingMessageReader
err := msgpack.Unmarshal(message, &reader)
if err != nil {
p.logger.Error("Error decoding basic message info", zap.Error(err))
return err
}
// Now, get the specific message handler based on the message kind
handler, ok := protocol.GetMessageType(reader.Kind)
if !ok {
p.logger.Error("Unknown message type", zap.Int("type", reader.Kind))
return fmt.Errorf("unknown message type: %d", reader.Kind)
}
data := base.IncomingMessageData{
Original: message,
Data: reader.Data,
Ctx: context.Background(),
Peer: peer,
VerifyId: verifyId,
Config: p.config,
}
dec := msgpack.NewDecoder(bytes.NewReader(reader.Data))
err = handler.DecodeMessage(dec, data)
if err != nil {
p.logger.Error("Error decoding message", zap.Error(err))
return err
}
// Directly decode and handle the specific message type
if err := handler.HandleMessage(data); err != nil {
p.logger.Error("Error handling message", zap.Error(err))
return err
}
return nil
}, net.ListenerOptions{
OnClose: &onDone,
OnError: &onError,
Logger: p.logger,
})
}
func (p *P2PService) readNodeVotes(nodeId *encoding.NodeId) (NodeVotes, error) {
var value []byte
var found bool
err := p.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(nodeBucketName))
if b == nil {
return fmt.Errorf("Bucket %s not found", nodeBucketName)
}
value = b.Get(nodeId.Raw())
if value == nil {
return nil
}
found = true
return nil
})
if err != nil {
return nil, err
}
if !found {
return NewNodeVotes(), nil
}
var score NodeVotes
err = msgpack.Unmarshal(value, &score)
if err != nil {
return nil, err
}
return score, nil
}
func (p *P2PService) saveNodeVotes(nodeId *encoding.NodeId, votes NodeVotes) error {
// Marshal the votes into data
data, err := msgpack.Marshal(votes)
if err != nil {
return err
}
// Use a transaction to save the data
err = p.db.Update(func(tx *bolt.Tx) error {
// Get or create the bucket
b := tx.Bucket([]byte(nodeBucketName))
// Put the data into the bucket
return b.Put(nodeId.Raw(), data)
})
return err
}
func (p *P2PService) GetNodeScore(nodeId *encoding.NodeId) (float64, error) {
if nodeId.Equals(p.localNodeID) {
return 1, nil
}
score, err := p.readNodeVotes(nodeId)
if err != nil {
return 0.5, err
}
return protocol.CalculateNodeScore(score.Good(), score.Bad()), nil
}
func (p *P2PService) SortNodesByScore(nodes []*encoding.NodeId) ([]*encoding.NodeId, error) {
scores := make(map[encoding.NodeIdCode]float64)
var errOccurred error
for _, nodeId := range nodes {
score, err := p.GetNodeScore(nodeId)
if err != nil {
errOccurred = err
scores[nodeId.HashCode()] = 0 // You may choose a different default value for error cases
} else {
scores[nodeId.HashCode()] = score
}
}
sort.Slice(nodes, func(i, j int) bool {
return scores[nodes[i].HashCode()] > scores[nodes[j].HashCode()]
})
return nodes, errOccurred
}
func (p *P2PService) SignMessageSimple(message []byte) ([]byte, error) {
signedMessage := signed.NewSignedMessageRequest(message)
signedMessage.SetNodeId(p.localNodeID)
err := signedMessage.Sign(p.config)
if err != nil {
return nil, err
}
result, err := msgpack.Marshal(signedMessage)
if err != nil {
return nil, err
}
return result, nil
}
func (p *P2PService) AddPeer(peer net.Peer) error {
peerId, err := peer.Id().ToString()
if err != nil {
return err
}
p.peers.Put(peerId, peer)
p.reconnectDelay.PutUInt(peerId, 1)
if p.peersPending.Contains(peerId) {
p.peersPending.Remove(peerId)
}
return nil
}
func (p *P2PService) SendPublicPeersToPeer(peer net.Peer, peersToSend []net.Peer) error {
announceRequest := signed.NewAnnounceRequest(peer, peersToSend)
message, err := msgpack.Marshal(announceRequest)
if err != nil {
return err
}
signedMessage, err := p.SignMessageSimple(message)
if err != nil {
return err
}
err = peer.SendMessage(signedMessage)
return nil
}
func (p *P2PService) SendHashRequest(hash *encoding.Multihash, kinds []types.StorageLocationType) error {
hashRequest := protocol.NewHashRequest(hash, kinds)
message, err := msgpack.Marshal(hashRequest)
if err != nil {
return err
}
for _, peer := range p.peers.Values() {
peerValue, ok := peer.(net.Peer)
if !ok {
p.logger.Error("failed to cast peer to net.Peer")
continue
}
err = peerValue.SendMessage(message)
}
return nil
}
func (p *P2PService) UpVote(nodeId *encoding.NodeId) error {
err := p.vote(nodeId, true)
if err != nil {
return err
}
return nil
}
func (p *P2PService) DownVote(nodeId *encoding.NodeId) error {
err := p.vote(nodeId, false)
if err != nil {
return err
}
return nil
}
func (p *P2PService) vote(nodeId *encoding.NodeId, upvote bool) error {
votes, err := p.readNodeVotes(nodeId)
if err != nil {
return err
}
if upvote {
votes.Upvote()
} else {
votes.Downvote()
}
err = p.saveNodeVotes(nodeId, votes)
if err != nil {
return err
}
return nil
}
func (p *P2PService) NodeId() *encoding.NodeId {
return p.localNodeID
}
func (p *P2PService) WaitOnConnectedPeers() {
p.connections.Wait()
}
func (p *P2PService) ConnectionTracker() *sync.WaitGroup {
return &p.connections
}
func (p *P2PService) NetworkId() string {
return p.config.P2P.Network
}
func (n *P2PService) HashQueryRoutingTable() structs.Map {
return n.hashQueryRoutingTable
} }

View File

@ -1,322 +1,15 @@
package service package service
import ( import (
"errors"
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
"git.lumeweb.com/LumeWeb/libs5-go/net" "git.lumeweb.com/LumeWeb/libs5-go/net"
"git.lumeweb.com/LumeWeb/libs5-go/protocol" "git.lumeweb.com/LumeWeb/libs5-go/protocol"
"git.lumeweb.com/LumeWeb/libs5-go/structs"
"git.lumeweb.com/LumeWeb/libs5-go/types"
"git.lumeweb.com/LumeWeb/libs5-go/utils"
"github.com/olebedev/emitter"
"github.com/vmihailenco/msgpack/v5"
"go.etcd.io/bbolt"
"go.uber.org/zap"
"time"
) )
const registryBucketName = "registry" type RegistryService interface {
var (
_ Service = (*RegistryService)(nil)
_ RegistryServiceInterface = (*RegistryService)(nil)
)
type RegistryServiceInterface interface {
Set(sre protocol.SignedRegistryEntry, trusted bool, receivedFrom net.Peer) error Set(sre protocol.SignedRegistryEntry, trusted bool, receivedFrom net.Peer) error
BroadcastEntry(sre protocol.SignedRegistryEntry, receivedFrom net.Peer) error BroadcastEntry(sre protocol.SignedRegistryEntry, receivedFrom net.Peer) error
SendRegistryRequest(pk []byte) error SendRegistryRequest(pk []byte) error
Get(pk []byte) (protocol.SignedRegistryEntry, error) Get(pk []byte) (protocol.SignedRegistryEntry, error)
Listen(pk []byte, cb func(sre protocol.SignedRegistryEntry)) (func(), error) Listen(pk []byte, cb func(sre protocol.SignedRegistryEntry)) (func(), error)
} Service
type RegistryService struct {
streams structs.Map
subs structs.Map
ServiceBase
}
func (r *RegistryService) Start() error {
return nil
}
func (r *RegistryService) Stop() error {
return nil
}
func (r *RegistryService) Init() error {
return utils.CreateBucket(registryBucketName, r.db)
}
func NewRegistry(params ServiceParams) *RegistryService {
return &RegistryService{
streams: structs.NewMap(),
subs: structs.NewMap(),
ServiceBase: ServiceBase{
logger: params.Logger,
config: params.Config,
db: params.Db,
},
}
}
func (r *RegistryService) Set(sre protocol.SignedRegistryEntry, trusted bool, receivedFrom net.Peer) error {
hash := encoding.NewMultihash(sre.PK())
hashString, err := hash.ToString()
if err != nil {
return err
}
pid, err := receivedFrom.Id().ToString()
if err != nil {
return err
}
r.logger.Debug("[registry] set", zap.String("pk", hashString), zap.Uint64("revision", sre.Revision()), zap.String("receivedFrom", pid))
if !trusted {
if len(sre.PK()) != 33 {
return errors.New("Invalid pubkey")
}
if int(sre.PK()[0]) != int(types.HashTypeEd25519) {
return errors.New("Only ed25519 keys are supported")
}
if sre.Revision() < 0 || sre.Revision() > 281474976710656 {
return errors.New("Invalid revision")
}
if len(sre.Data()) > types.RegistryMaxDataSize {
return errors.New("Data too long")
}
if !sre.Verify() {
return errors.New("Invalid signature found")
}
}
existingEntry, err := r.getFromDB(sre.PK())
if err != nil {
return err
}
if existingEntry != nil {
if receivedFrom != nil {
if existingEntry.Revision() == sre.Revision() {
return nil
} else if existingEntry.Revision() > sre.Revision() {
updateMessage := protocol.MarshalSignedRegistryEntry(existingEntry)
err := receivedFrom.SendMessage(updateMessage)
if err != nil {
return err
}
return nil
}
}
if existingEntry.Revision() >= sre.Revision() {
return errors.New("Revision number too low")
}
}
key := encoding.NewMultihash(sre.PK())
keyString, err := key.ToString()
if err != nil {
return err
}
eventObj, ok := r.streams.Get(keyString)
if ok {
event := eventObj.(*emitter.Emitter)
go event.Emit("fire", sre)
}
err = r.db.Update(func(txn *bbolt.Tx) error {
bucket := txn.Bucket([]byte(registryBucketName))
err := bucket.Put(sre.PK(), protocol.MarshalSignedRegistryEntry(sre))
if err != nil {
return err
}
return nil
})
if err != nil {
return err
}
err = r.BroadcastEntry(sre, receivedFrom)
if err != nil {
return err
}
return nil
}
func (r *RegistryService) BroadcastEntry(sre protocol.SignedRegistryEntry, receivedFrom net.Peer) error {
hash := encoding.NewMultihash(sre.PK())
hashString, err := hash.ToString()
if err != nil {
return err
}
pid, err := receivedFrom.Id().ToString()
if err != nil {
return err
}
r.logger.Debug("[registry] broadcastEntry", zap.String("pk", hashString), zap.Uint64("revision", sre.Revision()), zap.String("receivedFrom", pid))
updateMessage := protocol.MarshalSignedRegistryEntry(sre)
for _, p := range r.services.P2P().Peers().Values() {
peer, ok := p.(net.Peer)
if !ok {
continue
}
if receivedFrom == nil || peer.Id().Equals(receivedFrom.Id()) {
err := peer.SendMessage(updateMessage)
if err != nil {
pid, err := peer.Id().ToString()
if err != nil {
return err
}
r.logger.Error("Failed to send registry broadcast", zap.String("peer", pid), zap.Error(err))
return err
}
}
}
return nil
}
func (r *RegistryService) SendRegistryRequest(pk []byte) error {
query := protocol.NewRegistryQuery(pk)
request, err := msgpack.Marshal(query)
if err != nil {
return err
}
// Iterate over peers and send the request
for _, peerVal := range r.services.P2P().Peers().Values() {
peer, ok := peerVal.(net.Peer)
if !ok {
continue
}
err := peer.SendMessage(request)
if err != nil {
pid, err := peer.Id().ToString()
if err != nil {
return err
}
r.logger.Error("Failed to send registry request", zap.String("peer", pid), zap.Error(err))
return err
}
}
return nil
}
func (r *RegistryService) Get(pk []byte) (protocol.SignedRegistryEntry, error) {
key := encoding.NewMultihash(pk)
keyString, err := key.ToString()
if err != nil {
return nil, err
}
if r.subs.Contains(keyString) {
r.logger.Debug("[registry] get (cached)", zap.String("key", keyString))
res, err := r.getFromDB(pk)
if err != nil {
return nil, err
}
if res != nil {
return res, nil
}
err = r.SendRegistryRequest(pk)
if err != nil {
return nil, err
}
time.Sleep(200 * time.Millisecond)
return r.getFromDB(pk)
}
err = r.SendRegistryRequest(pk)
if err != nil {
return nil, err
}
r.subs.Put(keyString, key)
if !r.streams.Contains(keyString) {
event := &emitter.Emitter{}
r.streams.Put(keyString, event)
}
res, err := r.getFromDB(pk)
if err != nil {
return nil, err
}
if res != nil {
return res, nil
}
if res == nil {
r.logger.Debug("[registry] get (cached)", zap.String("key", keyString))
for i := 0; i < 200; i++ {
time.Sleep(10 * time.Millisecond)
res, err := r.getFromDB(pk)
if err != nil {
return nil, err
}
if res != nil {
break
}
}
}
return nil, nil
}
func (r *RegistryService) Listen(pk []byte, cb func(sre protocol.SignedRegistryEntry)) (func(), error) {
key, err := encoding.NewMultihash(pk).ToString()
if err != nil {
return nil, err
}
cbProxy := func(event *emitter.Event) {
sre, ok := event.Args[0].(protocol.SignedRegistryEntry)
if !ok {
r.logger.Error("Failed to cast event to SignedRegistryEntry")
return
}
cb(sre)
}
if !r.streams.Contains(key) {
em := emitter.New(0)
r.streams.Put(key, em)
err := r.SendRegistryRequest(pk)
if err != nil {
return nil, err
}
}
streamVal, _ := r.streams.Get(key)
stream := streamVal.(*emitter.Emitter)
channel := stream.On("fire", cbProxy)
return func() {
stream.Off("fire", channel)
}, nil
}
func (r *RegistryService) getFromDB(pk []byte) (sre protocol.SignedRegistryEntry, err error) {
err = r.db.View(func(txn *bbolt.Tx) error {
bucket := txn.Bucket([]byte(registryBucketName))
val := bucket.Get(pk)
if val != nil {
entry, err := protocol.UnmarshalSignedRegistryEntry(val)
if err != nil {
return err
}
sre = entry
return nil
}
return nil
})
if err != nil {
return nil, err
}
return sre, nil
} }

View File

@ -2,7 +2,7 @@ package service
import ( import (
"git.lumeweb.com/LumeWeb/libs5-go/config" "git.lumeweb.com/LumeWeb/libs5-go/config"
bolt "go.etcd.io/bbolt" "go.etcd.io/bbolt"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -11,12 +11,15 @@ type Service interface {
Stop() error Stop() error
Init() error Init() error
SetServices(services Services) SetServices(services Services)
Logger() *zap.Logger
Config() *config.NodeConfig
Db() *bbolt.DB
} }
type Services interface { type Services interface {
P2P() P2PServiceInterface P2P() P2PService
Registry() RegistryServiceInterface Registry() RegistryService
HTTP() HTTPServiceInterface HTTP() HTTPService
Storage() StorageServiceInterface Storage() StorageService
All() []Service All() []Service
IsStarted() bool IsStarted() bool
Start() error Start() error
@ -26,16 +29,32 @@ type Services interface {
type ServiceParams struct { type ServiceParams struct {
Logger *zap.Logger Logger *zap.Logger
Config *config.NodeConfig Config *config.NodeConfig
Db *bolt.DB Db *bbolt.DB
} }
type ServiceBase struct { type ServiceBase struct {
logger *zap.Logger logger *zap.Logger
config *config.NodeConfig config *config.NodeConfig
db *bolt.DB db *bbolt.DB
services Services services Services
} }
func NewServiceBase(logger *zap.Logger, config *config.NodeConfig, db *bbolt.DB) ServiceBase {
return ServiceBase{logger: logger, config: config, db: db}
}
func (s *ServiceBase) SetServices(services Services) { func (s *ServiceBase) SetServices(services Services) {
s.services = services s.services = services
} }
func (s *ServiceBase) Services() Services {
return s.services
}
func (s *ServiceBase) Logger() *zap.Logger {
return s.logger
}
func (s *ServiceBase) Config() *config.NodeConfig {
return s.config
}
func (s *ServiceBase) Db() *bbolt.DB {
return s.db
}

View File

@ -1,31 +1,13 @@
package service package service
import ( import (
"errors"
"fmt"
"git.lumeweb.com/LumeWeb/libs5-go/encoding" "git.lumeweb.com/LumeWeb/libs5-go/encoding"
"git.lumeweb.com/LumeWeb/libs5-go/metadata" "git.lumeweb.com/LumeWeb/libs5-go/metadata"
"git.lumeweb.com/LumeWeb/libs5-go/storage" "git.lumeweb.com/LumeWeb/libs5-go/storage"
"git.lumeweb.com/LumeWeb/libs5-go/storage/provider"
"git.lumeweb.com/LumeWeb/libs5-go/structs"
"git.lumeweb.com/LumeWeb/libs5-go/types" "git.lumeweb.com/LumeWeb/libs5-go/types"
"git.lumeweb.com/LumeWeb/libs5-go/utils"
"github.com/go-resty/resty/v2"
"github.com/vmihailenco/msgpack/v5"
"go.etcd.io/bbolt"
"go.uber.org/zap"
"time"
) )
const cacheBucketName = "object-cache" type StorageService interface {
var (
_ Service = (*StorageService)(nil)
_ storage.StorageLocationProviderStorageService = (*StorageService)(nil)
_ StorageServiceInterface = (*StorageService)(nil)
)
type StorageServiceInterface interface {
SetProviderStore(store storage.ProviderStore) SetProviderStore(store storage.ProviderStore)
ProviderStore() storage.ProviderStore ProviderStore() storage.ProviderStore
GetCachedStorageLocations(hash *encoding.Multihash, kinds []types.StorageLocationType) (map[string]storage.StorageLocation, error) GetCachedStorageLocations(hash *encoding.Multihash, kinds []types.StorageLocationType) (map[string]storage.StorageLocation, error)
@ -33,279 +15,5 @@ type StorageServiceInterface interface {
DownloadBytesByHash(hash *encoding.Multihash) ([]byte, error) DownloadBytesByHash(hash *encoding.Multihash) ([]byte, error)
DownloadBytesByCID(cid *encoding.CID) ([]byte, error) DownloadBytesByCID(cid *encoding.CID) ([]byte, error)
GetMetadataByCID(cid *encoding.CID) (metadata.Metadata, error) GetMetadataByCID(cid *encoding.CID) (metadata.Metadata, error)
} Service
type StorageService struct {
httpClient *resty.Client
metadataCache structs.Map
providerStore storage.ProviderStore
ServiceBase
}
func NewStorage(params ServiceParams) *StorageService {
return &StorageService{
httpClient: resty.New(),
metadataCache: structs.NewMap(),
ServiceBase: ServiceBase{
logger: params.Logger,
config: params.Config,
db: params.Db,
},
}
}
func (s *StorageService) Start() error {
err :=
utils.CreateBucket(cacheBucketName, s.db)
if err != nil {
return err
}
return nil
}
func (s *StorageService) Stop() error {
return nil
}
func (s *StorageService) Init() error {
return nil
}
func (n *StorageService) SetProviderStore(store storage.ProviderStore) {
n.providerStore = store
}
func (n *StorageService) ProviderStore() storage.ProviderStore {
return n.providerStore
}
func (s *StorageService) GetCachedStorageLocations(hash *encoding.Multihash, kinds []types.StorageLocationType) (map[string]storage.StorageLocation, error) {
locations := make(map[string]storage.StorageLocation)
locationMap, err := s.readStorageLocationsFromDB(hash)
if err != nil {
return nil, err
}
if len(locationMap) == 0 {
return make(map[string]storage.StorageLocation), nil
}
ts := time.Now().Unix()
for _, t := range kinds {
nodeMap, ok := (locationMap)[int(t)]
if !ok {
continue
}
for key, value := range nodeMap {
expiry, ok := value[3].(int64)
if !ok || expiry < ts {
continue
}
addressesInterface, ok := value[1].([]interface{})
if !ok {
continue
}
// Create a slice to hold the strings
addresses := make([]string, len(addressesInterface))
// Convert each element to string
for i, v := range addressesInterface {
str, ok := v.(string)
if !ok {
// Handle the error, maybe skip this element or set a default value
continue
}
addresses[i] = str
}
storageLocation := storage.NewStorageLocation(int(t), addresses, expiry)
if providerMessage, ok := value[4].([]byte); ok {
(storageLocation).SetProviderMessage(providerMessage)
}
locations[key] = storageLocation
}
}
return locations, nil
}
func (s *StorageService) readStorageLocationsFromDB(hash *encoding.Multihash) (storage.StorageLocationMap, error) {
var locationMap storage.StorageLocationMap
err := s.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte(cacheBucketName)) // Replace with your actual bucket name
if b == nil {
return fmt.Errorf("bucket %s not found", cacheBucketName)
}
bytes := b.Get(hash.FullBytes())
if bytes == nil {
// If no data found, return an empty locationMap but no error
locationMap = storage.NewStorageLocationMap()
return nil
}
return msgpack.Unmarshal(bytes, &locationMap)
})
if err != nil {
return nil, err
}
return locationMap, nil
}
func (s *StorageService) AddStorageLocation(hash *encoding.Multihash, nodeId *encoding.NodeId, location storage.StorageLocation, message []byte) error {
// Read existing storage locations
locationDb, err := s.readStorageLocationsFromDB(hash)
if err != nil {
return err
}
nodeIdStr, err := nodeId.ToString()
if err != nil {
return err
}
// Get or create the inner map for the specific type
innerMap, exists := locationDb[location.Type()]
if !exists {
innerMap = make(storage.NodeStorage, 1)
innerMap[nodeIdStr] = make(storage.NodeDetailsStorage, 1)
}
// Create location map with new data
locationMap := make(map[int]interface{}, 3)
locationMap[1] = location.Parts()
locationMap[3] = location.Expiry()
locationMap[4] = message
// Update the inner map with the new location
innerMap[nodeIdStr] = locationMap
locationDb[location.Type()] = innerMap
// Serialize the updated map and store it in the database
packedBytes, err := msgpack.Marshal(locationDb)
if err != nil {
return err
}
err = s.db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte(cacheBucketName))
return b.Put(hash.FullBytes(), packedBytes)
})
if err != nil {
return err
}
return nil
}
func (s *StorageService) DownloadBytesByHash(hash *encoding.Multihash) ([]byte, error) {
// Initialize the download URI provider
dlUriProvider := provider.NewStorageLocationProvider(provider.StorageLocationProviderParams{
Services: storage.NewStorageLocationProviderServices(s.services.P2P(), s.services.Storage()),
Hash: hash,
LocationTypes: []types.StorageLocationType{
types.StorageLocationTypeFull,
types.StorageLocationTypeFile,
},
Logger: s.logger,
Config: s.config,
Db: s.db,
})
err := dlUriProvider.Start()
if err != nil {
return nil, err
}
retryCount := 0
for {
dlUri, err := dlUriProvider.Next()
if err != nil {
return nil, err
}
s.logger.Debug("Trying to download from", zap.String("url", dlUri.Location().BytesURL()))
res, err := s.httpClient.R().Get(dlUri.Location().BytesURL())
if err != nil {
err := dlUriProvider.Downvote(dlUri)
if err != nil {
return nil, err
}
retryCount++
if retryCount > 32 {
return nil, errors.New("too many retries")
}
continue
}
bodyBytes := res.Body()
return bodyBytes, nil
}
}
func (s *StorageService) DownloadBytesByCID(cid *encoding.CID) (bytes []byte, err error) {
bytes, err = s.DownloadBytesByHash(&cid.Hash)
if err != nil {
return nil, err
}
return bytes, nil
}
func (s *StorageService) GetMetadataByCID(cid *encoding.CID) (md metadata.Metadata, err error) {
hashStr, err := cid.Hash.ToString()
if err != nil {
return nil, err
}
if s.metadataCache.Contains(hashStr) {
md, _ := s.metadataCache.Get(hashStr)
return md.(metadata.Metadata), nil
}
bytes, err := s.DownloadBytesByHash(&cid.Hash)
if err != nil {
return nil, err
}
switch cid.Type {
case types.CIDTypeMetadataMedia, types.CIDTypeBridge: // Both cases use the same deserialization method
md = metadata.NewEmptyMediaMetadata()
err = msgpack.Unmarshal(bytes, md)
if err != nil {
return nil, err
}
case types.CIDTypeMetadataWebapp:
md = metadata.NewEmptyWebAppMetadata()
err = msgpack.Unmarshal(bytes, md)
if err != nil {
return nil, err
}
case types.CIDTypeDirectory:
md = metadata.NewEmptyDirectoryMetadata()
err = msgpack.Unmarshal(bytes, md)
if err != nil {
return nil, err
}
default:
return nil, errors.New("unsupported metadata format")
}
s.metadataCache.Put(hashStr, md)
return md, nil
} }

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"git.lumeweb.com/LumeWeb/libs5-go/config" "git.lumeweb.com/LumeWeb/libs5-go/config"
"git.lumeweb.com/LumeWeb/libs5-go/encoding" "git.lumeweb.com/LumeWeb/libs5-go/encoding"
"git.lumeweb.com/LumeWeb/libs5-go/service"
"git.lumeweb.com/LumeWeb/libs5-go/storage" "git.lumeweb.com/LumeWeb/libs5-go/storage"
"git.lumeweb.com/LumeWeb/libs5-go/types" "git.lumeweb.com/LumeWeb/libs5-go/types"
bolt "go.etcd.io/bbolt" bolt "go.etcd.io/bbolt"
@ -16,7 +17,7 @@ import (
var _ storage.StorageLocationProvider = (*StorageLocationProviderImpl)(nil) var _ storage.StorageLocationProvider = (*StorageLocationProviderImpl)(nil)
type StorageLocationProviderImpl struct { type StorageLocationProviderImpl struct {
services storage.StorageLocationProviderServices services service.Services
hash *encoding.Multihash hash *encoding.Multihash
types []types.StorageLocationType types []types.StorageLocationType
timeoutDuration time.Duration timeoutDuration time.Duration
@ -193,10 +194,11 @@ func containsNode(slice []*encoding.NodeId, item *encoding.NodeId) bool {
} }
type StorageLocationProviderParams struct { type StorageLocationProviderParams struct {
Services storage.StorageLocationProviderServices Services service.Services
Hash *encoding.Multihash Hash *encoding.Multihash
LocationTypes []types.StorageLocationType LocationTypes []types.StorageLocationType
Logger *zap.Logger Logger *zap.Logger
Config *config.NodeConfig Config *config.NodeConfig
Db *bolt.DB Db *bolt.DB
service.ServiceParams
} }

View File

@ -3,7 +3,6 @@ package storage
import ( import (
"fmt" "fmt"
"git.lumeweb.com/LumeWeb/libs5-go/encoding" "git.lumeweb.com/LumeWeb/libs5-go/encoding"
"git.lumeweb.com/LumeWeb/libs5-go/types"
"github.com/vmihailenco/msgpack/v5" "github.com/vmihailenco/msgpack/v5"
"strconv" "strconv"
"time" "time"
@ -128,34 +127,3 @@ type StorageLocationProvider interface {
Upvote(uri SignedStorageLocation) error Upvote(uri SignedStorageLocation) error
Downvote(uri SignedStorageLocation) error Downvote(uri SignedStorageLocation) error
} }
type StorageLocationProviderServices interface {
P2P() StorageLocationProviderP2PService
Storage() StorageLocationProviderStorageService
}
type StorageLocationProviderP2PService interface {
SortNodesByScore(nodes []*encoding.NodeId) ([]*encoding.NodeId, error)
SendHashRequest(hash *encoding.Multihash, kinds []types.StorageLocationType) error
UpVote(nodeId *encoding.NodeId) error
DownVote(nodeId *encoding.NodeId) error
}
type StorageLocationProviderStorageService interface {
GetCachedStorageLocations(hash *encoding.Multihash, kinds []types.StorageLocationType) (map[string]StorageLocation, error)
}
type StorageLocationProviderServicesImpl struct {
p2p StorageLocationProviderP2PService
storage StorageLocationProviderStorageService
}
func NewStorageLocationProviderServices(p2p StorageLocationProviderP2PService, storage StorageLocationProviderStorageService) *StorageLocationProviderServicesImpl {
return &StorageLocationProviderServicesImpl{p2p: p2p, storage: storage}
}
func (s *StorageLocationProviderServicesImpl) P2P() StorageLocationProviderP2PService {
return s.p2p
}
func (s *StorageLocationProviderServicesImpl) Storage() StorageLocationProviderStorageService {
return s.storage
}