diff --git a/fx/fx.go b/fx/fx.go index b086b40..2618978 100644 --- a/fx/fx.go +++ b/fx/fx.go @@ -3,6 +3,7 @@ package fx import ( "git.lumeweb.com/LumeWeb/libs5-go/node" "git.lumeweb.com/LumeWeb/libs5-go/service" + _default "git.lumeweb.com/LumeWeb/libs5-go/service/default" "go.uber.org/fx" ) @@ -25,19 +26,19 @@ type ServicesParams struct { node.ServicesParams } -func newP2P(params ServiceParams) *service.P2PService { - return service.NewP2P(params.ServiceParams) +func newP2P(params ServiceParams) service.P2PService { + return _default.NewP2P(params.ServiceParams) } -func newRegistry(params ServiceParams) *service.RegistryService { - return service.NewRegistry(params.ServiceParams) +func newRegistry(params ServiceParams) service.RegistryService { + return _default.NewRegistry(params.ServiceParams) } -func newHTTP(params ServiceParams) *service.HTTPService { - return service.NewHTTP(params.ServiceParams) +func newHTTP(params ServiceParams) service.HTTPService { + return _default.NewHTTP(params.ServiceParams) } -func newStorage(params ServiceParams) *service.StorageService { - return service.NewStorage(params.ServiceParams) +func newStorage(params ServiceParams) service.StorageService { + return _default.NewStorage(params.ServiceParams) } func newServices(params ServicesParams) service.Services { diff --git a/service/default/http.go b/service/default/http.go new file mode 100644 index 0000000..f8227d3 --- /dev/null +++ b/service/default/http.go @@ -0,0 +1,117 @@ +package _default + +import ( + "git.lumeweb.com/LumeWeb/libs5-go/build" + "git.lumeweb.com/LumeWeb/libs5-go/net" + "git.lumeweb.com/LumeWeb/libs5-go/service" + "github.com/julienschmidt/httprouter" + "go.sia.tech/jape" + "go.uber.org/zap" + "net/url" + "nhooyr.io/websocket" +) + +var _ service.Service = (*HTTPServiceDefault)(nil) + +type P2PNodesResponse struct { + Nodes []P2PNodeResponse `json:"nodes"` +} + +type P2PNodeResponse struct { + Id string `json:"id"` + Uris []string `json:"uris"` +} + +type HTTPServiceDefault struct { + service.ServiceBase +} + +func NewHTTP(params service.ServiceParams) *HTTPServiceDefault { + return &HTTPServiceDefault{ + ServiceBase: service.NewServiceBase(params.Logger, params.Config, params.Db), + } +} + +func (h *HTTPServiceDefault) GetHttpRouter(inject map[string]jape.Handler) *httprouter.Router { + routes := map[string]jape.Handler{ + "GET /s5/version": h.versionHandler, + "GET /s5/p2p": h.p2pHandler, + "GET /s5/p2p/nodes": h.p2pNodesHandler, + } + + for k, v := range inject { + routes[k] = v + } + + return jape.Mux(routes) +} + +func (h *HTTPServiceDefault) Start() error { + return nil +} + +func (h *HTTPServiceDefault) Stop() error { + return nil +} + +func (h *HTTPServiceDefault) Init() error { + return nil +} + +func (h *HTTPServiceDefault) versionHandler(ctx jape.Context) { + _, _ = ctx.ResponseWriter.Write([]byte(build.Version)) +} +func (h *HTTPServiceDefault) p2pHandler(ctx jape.Context) { + c, err := websocket.Accept(ctx.ResponseWriter, ctx.Request, nil) + if err != nil { + h.Logger().Error("error accepting websocket connection", zap.Error(err)) + return + } + + peer, err := net.CreateTransportPeer("wss", &net.TransportPeerConfig{ + Socket: c, + Uris: []*url.URL{}, + }) + + if err != nil { + h.Logger().Error("error creating transport peer", zap.Error(err)) + err := c.Close(websocket.StatusInternalError, "the sky is falling") + if err != nil { + h.Logger().Error("error closing websocket connection", zap.Error(err)) + } + return + } + + h.Services().P2P().ConnectionTracker().Add(1) + + go func() { + err := h.Services().P2P().OnNewPeer(peer, false) + if err != nil { + h.Logger().Error("error handling new peer", zap.Error(err)) + } + h.Services().P2P().ConnectionTracker().Done() + }() +} + +func (h *HTTPServiceDefault) p2pNodesHandler(ctx jape.Context) { + localId, err := h.Services().P2P().NodeId().ToString() + + if ctx.Check("error getting local node id", err) != nil { + return + } + + uris := h.Services().P2P().SelfConnectionUris() + + nodeList := make([]P2PNodeResponse, len(uris)) + + for i, uri := range uris { + nodeList[i] = P2PNodeResponse{ + Id: localId, + Uris: []string{uri.String()}, + } + } + + ctx.Encode(P2PNodesResponse{ + Nodes: nodeList, + }) +} diff --git a/service/default/p2p.go b/service/default/p2p.go new file mode 100644 index 0000000..2606357 --- /dev/null +++ b/service/default/p2p.go @@ -0,0 +1,676 @@ +package _default + +import ( + "bytes" + "context" + "errors" + "fmt" + "git.lumeweb.com/LumeWeb/libs5-go/ed25519" + "git.lumeweb.com/LumeWeb/libs5-go/encoding" + "git.lumeweb.com/LumeWeb/libs5-go/net" + "git.lumeweb.com/LumeWeb/libs5-go/protocol" + "git.lumeweb.com/LumeWeb/libs5-go/protocol/base" + "git.lumeweb.com/LumeWeb/libs5-go/protocol/signed" + "git.lumeweb.com/LumeWeb/libs5-go/service" + "git.lumeweb.com/LumeWeb/libs5-go/structs" + "git.lumeweb.com/LumeWeb/libs5-go/types" + "git.lumeweb.com/LumeWeb/libs5-go/utils" + "github.com/vmihailenco/msgpack/v5" + bolt "go.etcd.io/bbolt" + "go.uber.org/zap" + "net/url" + "sort" + "sync" + "time" +) + +var _ service.P2PService = (*P2PServiceDefault)(nil) + +var ( + errUnsupportedProtocol = errors.New("unsupported protocol") + errConnectionIdMissingNodeID = errors.New("connection id missing node id") +) + +const nodeBucketName = "nodes" + +type P2PServiceDefault struct { + nodeKeyPair *ed25519.KeyPairEd25519 + localNodeID *encoding.NodeId + networkID string + nodesBucket *bolt.Bucket + inited bool + reconnectDelay structs.Map + peers structs.Map + peersPending structs.Map + selfConnectionUris []*url.URL + outgoingPeerBlocklist structs.Map + incomingPeerBlockList structs.Map + incomingIPBlocklist structs.Map + outgoingPeerFailures structs.Map + maxOutgoingPeerFailures uint + connections sync.WaitGroup + hashQueryRoutingTable structs.Map + service.ServiceBase +} + +func NewP2P(params service.ServiceParams) *P2PServiceDefault { + uri, err := url.Parse(fmt.Sprintf("wss://%s:%d/s5/p2p", params.Config.HTTP.API.Domain, params.Config.HTTP.API.Port)) + if err != nil { + params.Logger.Fatal("failed to parse HTTP API URL", zap.Error(err)) + } + + service := &P2PServiceDefault{ + nodeKeyPair: params.Config.KeyPair, + networkID: params.Config.P2P.Network, + inited: false, + reconnectDelay: structs.NewMap(), + peers: structs.NewMap(), + peersPending: structs.NewMap(), + selfConnectionUris: []*url.URL{uri}, + outgoingPeerBlocklist: structs.NewMap(), + incomingPeerBlockList: structs.NewMap(), + incomingIPBlocklist: structs.NewMap(), + outgoingPeerFailures: structs.NewMap(), + maxOutgoingPeerFailures: params.Config.P2P.MaxOutgoingPeerFailures, + hashQueryRoutingTable: structs.NewMap(), + ServiceBase: service.NewServiceBase(params.Logger, params.Config, params.Db), + } + + return service +} + +func (p *P2PServiceDefault) SelfConnectionUris() []*url.URL { + return p.selfConnectionUris +} + +func (p *P2PServiceDefault) Peers() structs.Map { + return p.peers +} + +func (p *P2PServiceDefault) Start() error { + config := p.Config() + if len(config.P2P.Peers.Initial) > 0 { + initialPeers := config.P2P.Peers.Initial + + for _, peer := range initialPeers { + u, err := url.Parse(peer) + if err != nil { + return err + } + + peer := peer + go func() { + err := p.ConnectToNode([]*url.URL{u}, false, nil) + if err != nil { + p.Logger().Error("failed to connect to initial peer", zap.Error(err), zap.String("peer", peer)) + } + }() + } + } + + return nil +} + +func (p *P2PServiceDefault) Stop() error { + return nil +} + +func (p *P2PServiceDefault) Init() error { + if p.inited { + return nil + } + p.localNodeID = encoding.NewNodeId(p.nodeKeyPair.PublicKey()) + + err := utils.CreateBucket(nodeBucketName, p.Db()) + + if err != nil { + return err + } + + p.inited = true + + return nil +} +func (p *P2PServiceDefault) ConnectToNode(connectionUris []*url.URL, retried bool, fromPeer net.Peer) error { + if !p.Services().IsStarted() { + return nil + } + + unsupported, _ := url.Parse("http://0.0.0.0") + unsupported.Scheme = "unsupported" + + var connectionUri *url.URL + + for _, uri := range connectionUris { + if uri.Scheme == "ws" || uri.Scheme == "wss" { + connectionUri = uri + break + } + } + + if connectionUri == nil { + for _, uri := range connectionUris { + if uri.Scheme == "tcp" { + connectionUri = uri + break + } + } + } + + if connectionUri == nil { + connectionUri = unsupported + } + + if connectionUri.Scheme == "unsupported" { + return errUnsupportedProtocol + } + + scheme := connectionUri.Scheme + + if connectionUri.User == nil { + return errConnectionIdMissingNodeID + } + + username := connectionUri.User.Username() + id, err := encoding.DecodeNodeId(username) + if err != nil { + return err + } + + idString, err := id.ToString() + if err != nil { + return err + } + + if p.peersPending.Contains(idString) || p.peers.Contains(idString) { + p.Logger().Debug("already connected", zap.String("node", connectionUri.String())) + return nil + } + + if p.outgoingPeerBlocklist.Contains(idString) { + p.Logger().Debug("outgoing peer is on blocklist", zap.String("node", connectionUri.String())) + + var fromPeerId string + + if fromPeer != nil { + blocked := false + if fromPeer.Id() != nil { + fromPeerId, err = fromPeer.Id().ToString() + if err != nil { + return err + } + if !p.incomingPeerBlockList.Contains(fromPeerId) { + p.incomingPeerBlockList.Put(fromPeerId, true) + blocked = true + } + } + + fromPeerIP := fromPeer.GetIP() + + if !p.incomingIPBlocklist.Contains(fromPeerIP) { + p.incomingIPBlocklist.Put(fromPeerIP, true) + blocked = true + } + err = fromPeer.EndForAbuse() + if err != nil { + return err + } + + if blocked { + p.Logger().Debug("blocking peer for sending peer on blocklist", zap.String("node", connectionUri.String()), zap.String("peer", fromPeerId), zap.String("ip", fromPeerIP)) + } + } + return nil + } + + reconnectDelay := p.reconnectDelay.GetUInt(idString) + if reconnectDelay == nil { + delay := uint(1) + reconnectDelay = &delay + } + + if id.Equals(p.localNodeID) { + return nil + } + + p.Logger().Debug("connect", zap.String("node", connectionUri.String())) + + socket, err := net.CreateTransportSocket(scheme, connectionUri) + if err != nil { + if retried { + p.Logger().Error("failed to connect, too many retries", zap.String("node", connectionUri.String()), zap.Error(err)) + counter := uint(0) + if p.outgoingPeerFailures.Contains(idString) { + tmp := *p.outgoingPeerFailures.GetUInt(idString) + counter = tmp + } + + counter++ + + p.outgoingPeerFailures.PutUInt(idString, counter) + + if counter >= p.maxOutgoingPeerFailures { + + if fromPeer != nil { + blocked := false + var fromPeerId string + if fromPeer.Id() != nil { + fromPeerId, err = fromPeer.Id().ToString() + if err != nil { + return err + } + if !p.incomingPeerBlockList.Contains(fromPeerId) { + p.incomingPeerBlockList.Put(fromPeerId, true) + blocked = true + } + } + + fromPeerIP := fromPeer.GetIP() + if !p.incomingIPBlocklist.Contains(fromPeerIP) { + p.incomingIPBlocklist.Put(fromPeerIP, true) + blocked = true + } + err = fromPeer.EndForAbuse() + if err != nil { + return err + } + + if blocked { + p.Logger().Debug("blocking peer for sending peer on blocklist", zap.String("node", connectionUri.String()), zap.String("peer", fromPeerId), zap.String("ip", fromPeerIP)) + } + } + p.outgoingPeerBlocklist.Put(idString, true) + p.Logger().Debug("blocking peer for too many failures", zap.String("node", connectionUri.String())) + } + + return nil + } + retried = true + + p.Logger().Error("failed to connect", zap.String("node", connectionUri.String()), zap.Error(err)) + + delay := p.reconnectDelay.GetUInt(idString) + if delay == nil { + tmp := uint(1) + delay = &tmp + } + delayDeref := *delay + p.reconnectDelay.PutUInt(idString, delayDeref*2) + + time.Sleep(time.Duration(delayDeref) * time.Second) + + return p.ConnectToNode(connectionUris, retried, fromPeer) + } + + if p.outgoingPeerFailures.Contains(idString) { + p.outgoingPeerFailures.Remove(idString) + } + + peer, err := net.CreateTransportPeer(scheme, &net.TransportPeerConfig{ + Socket: socket, + Uris: []*url.URL{connectionUri}, + }) + + if err != nil { + return err + } + + peer.SetId(id) + + p.Services().P2P().ConnectionTracker().Add(1) + + peerId, err := peer.Id().ToString() + if err != nil { + return err + } + p.peersPending.Put(peerId, peer) + + go func() { + err := p.OnNewPeer(peer, true) + if err != nil && !peer.Abuser() { + p.Logger().Error("peer error", zap.Error(err)) + } + p.Services().P2P().ConnectionTracker().Done() + }() + + return nil + +} + +func (p *P2PServiceDefault) OnNewPeer(peer net.Peer, verifyId bool) error { + var wg sync.WaitGroup + + var pid string + + if peer.Id() != nil { + pid, _ = peer.Id().ToString() + } else { + pid = "unknown" + } + + pip := peer.GetIP() + + if p.incomingIPBlocklist.Contains(pid) { + p.Logger().Error("peer is on identity blocklist", zap.String("peer", pid)) + err := peer.EndForAbuse() + if err != nil { + return err + } + return nil + } + if p.incomingPeerBlockList.Contains(pip) { + p.Logger().Debug("peer is on ip blocklist", zap.String("peer", pid), zap.String("ip", pip)) + err := peer.EndForAbuse() + if err != nil { + return err + } + return nil + } + + p.Logger().Debug("OnNewPeer started", zap.String("peer", pid)) + + challenge := protocol.GenerateChallenge() + peer.SetChallenge(challenge) + + wg.Add(1) + go func() { + defer wg.Done() + p.OnNewPeerListen(peer, verifyId) + }() + + handshakeOpenMsg, err := msgpack.Marshal(protocol.NewHandshakeOpen(challenge, p.networkID)) + if err != nil { + return err + } + + err = peer.SendMessage(handshakeOpenMsg) + if err != nil { + return err + } + p.Logger().Debug("OnNewPeer sent handshake", zap.String("peer", pid)) + + p.Logger().Debug("OnNewPeer before Wait", zap.String("peer", pid)) + wg.Wait() // Wait for OnNewPeerListen goroutine to finish + p.Logger().Debug("OnNewPeer ended", zap.String("peer", pid)) + return nil +} +func (p *P2PServiceDefault) OnNewPeerListen(peer net.Peer, verifyId bool) { + onDone := net.CloseCallback(func() { + if peer.Id() != nil { + pid, err := peer.Id().ToString() + if err != nil { + p.Logger().Error("failed to get peer id", zap.Error(err)) + return + } + // Handle closure of the connection + if p.peers.Contains(pid) { + p.peers.Remove(pid) + } + if p.peersPending.Contains(pid) { + p.peersPending.Remove(pid) + } + } + }) + + onError := net.ErrorCallback(func(args ...interface{}) { + if !peer.Abuser() { + p.Logger().Error("peer error", zap.Any("args", args)) + } + }) + + peer.ListenForMessages(func(message []byte) error { + var reader base.IncomingMessageReader + + err := msgpack.Unmarshal(message, &reader) + if err != nil { + p.Logger().Error("Error decoding basic message info", zap.Error(err)) + return err + } + + // Now, get the specific message handler based on the message kind + handler, ok := protocol.GetMessageType(reader.Kind) + if !ok { + p.Logger().Error("Unknown message type", zap.Int("type", reader.Kind)) + return fmt.Errorf("unknown message type: %d", reader.Kind) + } + + data := base.IncomingMessageData{ + Original: message, + Data: reader.Data, + Ctx: context.Background(), + Peer: peer, + VerifyId: verifyId, + Config: p.Config(), + } + + dec := msgpack.NewDecoder(bytes.NewReader(reader.Data)) + + err = handler.DecodeMessage(dec, data) + if err != nil { + p.Logger().Error("Error decoding message", zap.Error(err)) + return err + } + + // Directly decode and handle the specific message type + if err := handler.HandleMessage(data); err != nil { + p.Logger().Error("Error handling message", zap.Error(err)) + return err + } + + return nil + }, net.ListenerOptions{ + OnClose: &onDone, + OnError: &onError, + Logger: p.Logger(), + }) +} + +func (p *P2PServiceDefault) readNodeVotes(nodeId *encoding.NodeId) (service.NodeVotes, error) { + var value []byte + var found bool + err := p.Db().View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(nodeBucketName)) + if b == nil { + return fmt.Errorf("Bucket %s not found", nodeBucketName) + } + value = b.Get(nodeId.Raw()) + if value == nil { + return nil + } + found = true + return nil + }) + if err != nil { + return nil, err + } + if !found { + return service.NewNodeVotes(), nil + } + + var score service.NodeVotes + err = msgpack.Unmarshal(value, &score) + if err != nil { + return nil, err + } + + return score, nil +} + +func (p *P2PServiceDefault) saveNodeVotes(nodeId *encoding.NodeId, votes service.NodeVotes) error { + // Marshal the votes into data + data, err := msgpack.Marshal(votes) + if err != nil { + return err + } + + // Use a transaction to save the data + err = p.Db().Update(func(tx *bolt.Tx) error { + // Get or create the bucket + b := tx.Bucket([]byte(nodeBucketName)) + + // Put the data into the bucket + return b.Put(nodeId.Raw(), data) + }) + + return err +} + +func (p *P2PServiceDefault) GetNodeScore(nodeId *encoding.NodeId) (float64, error) { + if nodeId.Equals(p.localNodeID) { + return 1, nil + } + + score, err := p.readNodeVotes(nodeId) + if err != nil { + return 0.5, err + } + + return protocol.CalculateNodeScore(score.Good(), score.Bad()), nil + +} +func (p *P2PServiceDefault) SortNodesByScore(nodes []*encoding.NodeId) ([]*encoding.NodeId, error) { + scores := make(map[encoding.NodeIdCode]float64) + var errOccurred error + + for _, nodeId := range nodes { + score, err := p.GetNodeScore(nodeId) + if err != nil { + errOccurred = err + scores[nodeId.HashCode()] = 0 // You may choose a different default value for error cases + } else { + scores[nodeId.HashCode()] = score + } + } + + sort.Slice(nodes, func(i, j int) bool { + return scores[nodes[i].HashCode()] > scores[nodes[j].HashCode()] + }) + + return nodes, errOccurred +} +func (p *P2PServiceDefault) SignMessageSimple(message []byte) ([]byte, error) { + signedMessage := signed.NewSignedMessageRequest(message) + signedMessage.SetNodeId(p.localNodeID) + + err := signedMessage.Sign(p.Config()) + + if err != nil { + return nil, err + } + + result, err := msgpack.Marshal(signedMessage) + + if err != nil { + return nil, err + } + + return result, nil +} + +func (p *P2PServiceDefault) AddPeer(peer net.Peer) error { + peerId, err := peer.Id().ToString() + if err != nil { + return err + } + p.peers.Put(peerId, peer) + p.reconnectDelay.PutUInt(peerId, 1) + + if p.peersPending.Contains(peerId) { + p.peersPending.Remove(peerId) + } + + return nil +} +func (p *P2PServiceDefault) SendPublicPeersToPeer(peer net.Peer, peersToSend []net.Peer) error { + announceRequest := signed.NewAnnounceRequest(peer, peersToSend) + + message, err := msgpack.Marshal(announceRequest) + + if err != nil { + return err + } + + signedMessage, err := p.SignMessageSimple(message) + + if err != nil { + return err + } + + err = peer.SendMessage(signedMessage) + + return nil +} +func (p *P2PServiceDefault) SendHashRequest(hash *encoding.Multihash, kinds []types.StorageLocationType) error { + hashRequest := protocol.NewHashRequest(hash, kinds) + message, err := msgpack.Marshal(hashRequest) + if err != nil { + return err + } + + for _, peer := range p.peers.Values() { + peerValue, ok := peer.(net.Peer) + if !ok { + p.Logger().Error("failed to cast peer to net.Peer") + continue + } + err = peerValue.SendMessage(message) + } + + return nil +} + +func (p *P2PServiceDefault) UpVote(nodeId *encoding.NodeId) error { + err := p.vote(nodeId, true) + if err != nil { + return err + } + + return nil +} + +func (p *P2PServiceDefault) DownVote(nodeId *encoding.NodeId) error { + err := p.vote(nodeId, false) + if err != nil { + return err + } + + return nil +} + +func (p *P2PServiceDefault) vote(nodeId *encoding.NodeId, upvote bool) error { + votes, err := p.readNodeVotes(nodeId) + if err != nil { + return err + } + + if upvote { + votes.Upvote() + } else { + votes.Downvote() + } + + err = p.saveNodeVotes(nodeId, votes) + if err != nil { + return err + } + + return nil +} +func (p *P2PServiceDefault) NodeId() *encoding.NodeId { + return p.localNodeID +} + +func (p *P2PServiceDefault) WaitOnConnectedPeers() { + p.connections.Wait() +} + +func (p *P2PServiceDefault) ConnectionTracker() *sync.WaitGroup { + return &p.connections +} + +func (p *P2PServiceDefault) NetworkId() string { + return p.Config().P2P.Network +} +func (n *P2PServiceDefault) HashQueryRoutingTable() structs.Map { + return n.hashQueryRoutingTable +} diff --git a/service/default/registry.go b/service/default/registry.go new file mode 100644 index 0000000..978baa1 --- /dev/null +++ b/service/default/registry.go @@ -0,0 +1,311 @@ +package _default + +import ( + "errors" + "git.lumeweb.com/LumeWeb/libs5-go/encoding" + "git.lumeweb.com/LumeWeb/libs5-go/net" + "git.lumeweb.com/LumeWeb/libs5-go/protocol" + "git.lumeweb.com/LumeWeb/libs5-go/service" + "git.lumeweb.com/LumeWeb/libs5-go/structs" + "git.lumeweb.com/LumeWeb/libs5-go/types" + "git.lumeweb.com/LumeWeb/libs5-go/utils" + "github.com/olebedev/emitter" + "github.com/vmihailenco/msgpack/v5" + "go.etcd.io/bbolt" + "go.uber.org/zap" + "time" +) + +const registryBucketName = "registry" + +var ( + _ service.Service = (*RegistryServiceDefault)(nil) + _ service.RegistryService = (*RegistryServiceDefault)(nil) +) + +type RegistryServiceDefault struct { + streams structs.Map + subs structs.Map + service.ServiceBase +} + +func (r *RegistryServiceDefault) Start() error { + return nil +} + +func (r *RegistryServiceDefault) Stop() error { + return nil +} + +func (r *RegistryServiceDefault) Init() error { + return utils.CreateBucket(registryBucketName, r.Db()) +} + +func NewRegistry(params service.ServiceParams) *RegistryServiceDefault { + return &RegistryServiceDefault{ + streams: structs.NewMap(), + subs: structs.NewMap(), + ServiceBase: service.NewServiceBase(params.Logger, params.Config, params.Db), + } +} +func (r *RegistryServiceDefault) Set(sre protocol.SignedRegistryEntry, trusted bool, receivedFrom net.Peer) error { + hash := encoding.NewMultihash(sre.PK()) + hashString, err := hash.ToString() + if err != nil { + return err + } + pid, err := receivedFrom.Id().ToString() + if err != nil { + return err + } + r.Logger().Debug("[registry] set", zap.String("pk", hashString), zap.Uint64("revision", sre.Revision()), zap.String("receivedFrom", pid)) + + if !trusted { + if len(sre.PK()) != 33 { + return errors.New("Invalid pubkey") + } + if int(sre.PK()[0]) != int(types.HashTypeEd25519) { + return errors.New("Only ed25519 keys are supported") + } + if sre.Revision() < 0 || sre.Revision() > 281474976710656 { + return errors.New("Invalid revision") + } + if len(sre.Data()) > types.RegistryMaxDataSize { + return errors.New("Data too long") + } + + if !sre.Verify() { + return errors.New("Invalid signature found") + } + } + + existingEntry, err := r.getFromDB(sre.PK()) + if err != nil { + return err + } + + if existingEntry != nil { + if receivedFrom != nil { + if existingEntry.Revision() == sre.Revision() { + return nil + } else if existingEntry.Revision() > sre.Revision() { + updateMessage := protocol.MarshalSignedRegistryEntry(existingEntry) + err := receivedFrom.SendMessage(updateMessage) + if err != nil { + return err + } + return nil + } + } + + if existingEntry.Revision() >= sre.Revision() { + return errors.New("Revision number too low") + } + } + + key := encoding.NewMultihash(sre.PK()) + keyString, err := key.ToString() + if err != nil { + return err + } + + eventObj, ok := r.streams.Get(keyString) + if ok { + event := eventObj.(*emitter.Emitter) + go event.Emit("fire", sre) + } + + err = r.Db().Update(func(txn *bbolt.Tx) error { + bucket := txn.Bucket([]byte(registryBucketName)) + err := bucket.Put(sre.PK(), protocol.MarshalSignedRegistryEntry(sre)) + if err != nil { + return err + } + return nil + }) + + if err != nil { + return err + } + + err = r.BroadcastEntry(sre, receivedFrom) + if err != nil { + return err + } + + return nil +} +func (r *RegistryServiceDefault) BroadcastEntry(sre protocol.SignedRegistryEntry, receivedFrom net.Peer) error { + hash := encoding.NewMultihash(sre.PK()) + hashString, err := hash.ToString() + if err != nil { + return err + } + pid, err := receivedFrom.Id().ToString() + if err != nil { + return err + } + r.Logger().Debug("[registry] broadcastEntry", zap.String("pk", hashString), zap.Uint64("revision", sre.Revision()), zap.String("receivedFrom", pid)) + updateMessage := protocol.MarshalSignedRegistryEntry(sre) + + for _, p := range r.Services().P2P().Peers().Values() { + peer, ok := p.(net.Peer) + if !ok { + continue + } + if receivedFrom == nil || peer.Id().Equals(receivedFrom.Id()) { + err := peer.SendMessage(updateMessage) + if err != nil { + pid, err := peer.Id().ToString() + if err != nil { + return err + } + r.Logger().Error("Failed to send registry broadcast", zap.String("peer", pid), zap.Error(err)) + return err + } + } + } + + return nil +} +func (r *RegistryServiceDefault) SendRegistryRequest(pk []byte) error { + query := protocol.NewRegistryQuery(pk) + + request, err := msgpack.Marshal(query) + if err != nil { + return err + } + + // Iterate over peers and send the request + for _, peerVal := range r.Services().P2P().Peers().Values() { + peer, ok := peerVal.(net.Peer) + if !ok { + continue + } + err := peer.SendMessage(request) + if err != nil { + pid, err := peer.Id().ToString() + if err != nil { + return err + } + r.Logger().Error("Failed to send registry request", zap.String("peer", pid), zap.Error(err)) + return err + } + } + + return nil +} +func (r *RegistryServiceDefault) Get(pk []byte) (protocol.SignedRegistryEntry, error) { + key := encoding.NewMultihash(pk) + keyString, err := key.ToString() + if err != nil { + return nil, err + } + + if r.subs.Contains(keyString) { + r.Logger().Debug("[registry] get (cached)", zap.String("key", keyString)) + res, err := r.getFromDB(pk) + if err != nil { + return nil, err + } + if res != nil { + return res, nil + } + + err = r.SendRegistryRequest(pk) + if err != nil { + return nil, err + } + time.Sleep(200 * time.Millisecond) + return r.getFromDB(pk) + } + + err = r.SendRegistryRequest(pk) + if err != nil { + return nil, err + } + r.subs.Put(keyString, key) + if !r.streams.Contains(keyString) { + event := &emitter.Emitter{} + r.streams.Put(keyString, event) + } + + res, err := r.getFromDB(pk) + if err != nil { + return nil, err + } + + if res != nil { + return res, nil + } + + if res == nil { + r.Logger().Debug("[registry] get (cached)", zap.String("key", keyString)) + for i := 0; i < 200; i++ { + time.Sleep(10 * time.Millisecond) + res, err := r.getFromDB(pk) + if err != nil { + return nil, err + } + if res != nil { + break + } + } + } + + return nil, nil +} + +func (r *RegistryServiceDefault) Listen(pk []byte, cb func(sre protocol.SignedRegistryEntry)) (func(), error) { + key, err := encoding.NewMultihash(pk).ToString() + if err != nil { + return nil, err + } + + cbProxy := func(event *emitter.Event) { + sre, ok := event.Args[0].(protocol.SignedRegistryEntry) + if !ok { + r.Logger().Error("Failed to cast event to SignedRegistryEntry") + return + } + + cb(sre) + } + + if !r.streams.Contains(key) { + em := emitter.New(0) + r.streams.Put(key, em) + err := r.SendRegistryRequest(pk) + if err != nil { + return nil, err + } + } + streamVal, _ := r.streams.Get(key) + stream := streamVal.(*emitter.Emitter) + channel := stream.On("fire", cbProxy) + + return func() { + stream.Off("fire", channel) + }, nil +} + +func (r *RegistryServiceDefault) getFromDB(pk []byte) (sre protocol.SignedRegistryEntry, err error) { + err = r.Db().View(func(txn *bbolt.Tx) error { + bucket := txn.Bucket([]byte(registryBucketName)) + val := bucket.Get(pk) + if val != nil { + entry, err := protocol.UnmarshalSignedRegistryEntry(val) + if err != nil { + return err + } + sre = entry + return nil + } + return nil + }) + if err != nil { + return nil, err + } + + return sre, nil +} diff --git a/service/default/service.go b/service/default/service.go new file mode 100644 index 0000000..ef91f17 --- /dev/null +++ b/service/default/service.go @@ -0,0 +1 @@ +package _default diff --git a/service/default/storage.go b/service/default/storage.go new file mode 100644 index 0000000..f2a2579 --- /dev/null +++ b/service/default/storage.go @@ -0,0 +1,297 @@ +package _default + +import ( + "errors" + "fmt" + "git.lumeweb.com/LumeWeb/libs5-go/encoding" + "git.lumeweb.com/LumeWeb/libs5-go/metadata" + "git.lumeweb.com/LumeWeb/libs5-go/service" + "git.lumeweb.com/LumeWeb/libs5-go/storage" + "git.lumeweb.com/LumeWeb/libs5-go/storage/provider" + "git.lumeweb.com/LumeWeb/libs5-go/structs" + "git.lumeweb.com/LumeWeb/libs5-go/types" + "git.lumeweb.com/LumeWeb/libs5-go/utils" + "github.com/go-resty/resty/v2" + "github.com/vmihailenco/msgpack/v5" + "go.etcd.io/bbolt" + "go.uber.org/zap" + "time" +) + +const cacheBucketName = "object-cache" + +var ( + _ service.Service = (*StorageService)(nil) + _ service.StorageService = (*StorageService)(nil) +) + +type StorageService struct { + httpClient *resty.Client + metadataCache structs.Map + providerStore storage.ProviderStore + service.ServiceBase +} + +func NewStorage(params service.ServiceParams) *StorageService { + return &StorageService{ + httpClient: resty.New(), + metadataCache: structs.NewMap(), + ServiceBase: service.NewServiceBase(params.Logger, params.Config, params.Db), + } +} + +func (s *StorageService) Start() error { + err := + utils.CreateBucket(cacheBucketName, s.Db()) + + if err != nil { + return err + } + + return nil +} + +func (s *StorageService) Stop() error { + return nil +} + +func (s *StorageService) Init() error { + return nil +} + +func (n *StorageService) SetProviderStore(store storage.ProviderStore) { + n.providerStore = store +} + +func (n *StorageService) ProviderStore() storage.ProviderStore { + return n.providerStore +} + +func (s *StorageService) GetCachedStorageLocations(hash *encoding.Multihash, kinds []types.StorageLocationType) (map[string]storage.StorageLocation, error) { + locations := make(map[string]storage.StorageLocation) + + locationMap, err := s.readStorageLocationsFromDB(hash) + if err != nil { + return nil, err + } + if len(locationMap) == 0 { + return make(map[string]storage.StorageLocation), nil + } + + ts := time.Now().Unix() + + for _, t := range kinds { + nodeMap, ok := (locationMap)[int(t)] + if !ok { + continue + } + + for key, value := range nodeMap { + expiry, ok := value[3].(int64) + if !ok || expiry < ts { + continue + } + + addressesInterface, ok := value[1].([]interface{}) + if !ok { + continue + } + + // Create a slice to hold the strings + addresses := make([]string, len(addressesInterface)) + + // Convert each element to string + for i, v := range addressesInterface { + str, ok := v.(string) + if !ok { + // Handle the error, maybe skip this element or set a default value + continue + } + addresses[i] = str + } + + storageLocation := storage.NewStorageLocation(int(t), addresses, expiry) + + if providerMessage, ok := value[4].([]byte); ok { + (storageLocation).SetProviderMessage(providerMessage) + } + + locations[key] = storageLocation + } + } + return locations, nil +} +func (s *StorageService) readStorageLocationsFromDB(hash *encoding.Multihash) (storage.StorageLocationMap, error) { + var locationMap storage.StorageLocationMap + + err := s.Db().View(func(tx *bbolt.Tx) error { + b := tx.Bucket([]byte(cacheBucketName)) // Replace with your actual bucket name + if b == nil { + return fmt.Errorf("bucket %s not found", cacheBucketName) + } + + bytes := b.Get(hash.FullBytes()) + if bytes == nil { + // If no data found, return an empty locationMap but no error + locationMap = storage.NewStorageLocationMap() + return nil + } + + return msgpack.Unmarshal(bytes, &locationMap) + }) + + if err != nil { + return nil, err + } + + return locationMap, nil +} + +func (s *StorageService) AddStorageLocation(hash *encoding.Multihash, nodeId *encoding.NodeId, location storage.StorageLocation, message []byte) error { + // Read existing storage locations + locationDb, err := s.readStorageLocationsFromDB(hash) + if err != nil { + return err + } + + nodeIdStr, err := nodeId.ToString() + if err != nil { + return err + } + + // Get or create the inner map for the specific type + innerMap, exists := locationDb[location.Type()] + if !exists { + innerMap = make(storage.NodeStorage, 1) + innerMap[nodeIdStr] = make(storage.NodeDetailsStorage, 1) + } + + // Create location map with new data + locationMap := make(map[int]interface{}, 3) + locationMap[1] = location.Parts() + locationMap[3] = location.Expiry() + locationMap[4] = message + + // Update the inner map with the new location + innerMap[nodeIdStr] = locationMap + locationDb[location.Type()] = innerMap + + // Serialize the updated map and store it in the database + packedBytes, err := msgpack.Marshal(locationDb) + if err != nil { + return err + } + err = s.Db().Update(func(tx *bbolt.Tx) error { + b := tx.Bucket([]byte(cacheBucketName)) + + return b.Put(hash.FullBytes(), packedBytes) + }) + if err != nil { + return err + } + + return nil +} + +func (s *StorageService) DownloadBytesByHash(hash *encoding.Multihash) ([]byte, error) { + // Initialize the download URI provider + dlUriProvider := provider.NewStorageLocationProvider(provider.StorageLocationProviderParams{ + Services: s.Services(), + Hash: hash, + LocationTypes: []types.StorageLocationType{ + types.StorageLocationTypeFull, + types.StorageLocationTypeFile, + }, + Logger: s.Logger(), + Config: s.Config(), + Db: s.Db(), + }) + err := dlUriProvider.Start() + if err != nil { + return nil, err + } + + retryCount := 0 + for { + dlUri, err := dlUriProvider.Next() + if err != nil { + return nil, err + } + + s.Logger().Debug("Trying to download from", zap.String("url", dlUri.Location().BytesURL())) + + res, err := s.httpClient.R().Get(dlUri.Location().BytesURL()) + if err != nil { + err := dlUriProvider.Downvote(dlUri) + if err != nil { + return nil, err + } + retryCount++ + if retryCount > 32 { + return nil, errors.New("too many retries") + } + continue + } + + bodyBytes := res.Body() + + return bodyBytes, nil + } +} + +func (s *StorageService) DownloadBytesByCID(cid *encoding.CID) (bytes []byte, err error) { + bytes, err = s.DownloadBytesByHash(&cid.Hash) + if err != nil { + return nil, err + } + + return bytes, nil +} + +func (s *StorageService) GetMetadataByCID(cid *encoding.CID) (md metadata.Metadata, err error) { + hashStr, err := cid.Hash.ToString() + if err != nil { + return nil, err + } + + if s.metadataCache.Contains(hashStr) { + md, _ := s.metadataCache.Get(hashStr) + + return md.(metadata.Metadata), nil + } + + bytes, err := s.DownloadBytesByHash(&cid.Hash) + if err != nil { + return nil, err + } + + switch cid.Type { + case types.CIDTypeMetadataMedia, types.CIDTypeBridge: // Both cases use the same deserialization method + md = metadata.NewEmptyMediaMetadata() + + err = msgpack.Unmarshal(bytes, md) + if err != nil { + return nil, err + } + case types.CIDTypeMetadataWebapp: + md = metadata.NewEmptyWebAppMetadata() + + err = msgpack.Unmarshal(bytes, md) + if err != nil { + return nil, err + } + case types.CIDTypeDirectory: + md = metadata.NewEmptyDirectoryMetadata() + + err = msgpack.Unmarshal(bytes, md) + if err != nil { + return nil, err + } + default: + return nil, errors.New("unsupported metadata format") + } + + s.metadataCache.Put(hashStr, md) + + return md, nil +} diff --git a/service/http.go b/service/http.go index d84be17..df8c41b 100644 --- a/service/http.go +++ b/service/http.go @@ -1,124 +1,11 @@ package service import ( - "git.lumeweb.com/LumeWeb/libs5-go/build" - "git.lumeweb.com/LumeWeb/libs5-go/net" "github.com/julienschmidt/httprouter" "go.sia.tech/jape" - "go.uber.org/zap" - "net/url" - "nhooyr.io/websocket" ) -var _ Service = (*HTTPService)(nil) - -type P2PNodesResponse struct { - Nodes []P2PNodeResponse `json:"nodes"` -} - -type P2PNodeResponse struct { - Id string `json:"id"` - Uris []string `json:"uris"` -} - -type HTTPServiceInterface interface { +type HTTPService interface { GetHttpRouter(inject map[string]jape.Handler) *httprouter.Router -} - -type HTTPService struct { - ServiceBase -} - -func NewHTTP(params ServiceParams) *HTTPService { - return &HTTPService{ - ServiceBase: ServiceBase{ - logger: params.Logger, - config: params.Config, - db: params.Db, - }, - } -} - -func (h *HTTPService) GetHttpRouter(inject map[string]jape.Handler) *httprouter.Router { - routes := map[string]jape.Handler{ - "GET /s5/version": h.versionHandler, - "GET /s5/p2p": h.p2pHandler, - "GET /s5/p2p/nodes": h.p2pNodesHandler, - } - - for k, v := range inject { - routes[k] = v - } - - return jape.Mux(routes) -} - -func (h *HTTPService) Start() error { - return nil -} - -func (h *HTTPService) Stop() error { - return nil -} - -func (h *HTTPService) Init() error { - return nil -} - -func (h *HTTPService) versionHandler(ctx jape.Context) { - _, _ = ctx.ResponseWriter.Write([]byte(build.Version)) -} -func (h *HTTPService) p2pHandler(ctx jape.Context) { - c, err := websocket.Accept(ctx.ResponseWriter, ctx.Request, nil) - if err != nil { - h.logger.Error("error accepting websocket connection", zap.Error(err)) - return - } - - peer, err := net.CreateTransportPeer("wss", &net.TransportPeerConfig{ - Socket: c, - Uris: []*url.URL{}, - }) - - if err != nil { - h.logger.Error("error creating transport peer", zap.Error(err)) - err := c.Close(websocket.StatusInternalError, "the sky is falling") - if err != nil { - h.logger.Error("error closing websocket connection", zap.Error(err)) - } - return - } - - h.services.P2P().ConnectionTracker().Add(1) - - go func() { - err := h.services.P2P().OnNewPeer(peer, false) - if err != nil { - h.logger.Error("error handling new peer", zap.Error(err)) - } - h.services.P2P().ConnectionTracker().Done() - }() -} - -func (h *HTTPService) p2pNodesHandler(ctx jape.Context) { - localId, err := h.services.P2P().NodeId().ToString() - - if ctx.Check("error getting local node id", err) != nil { - return - } - - uris := h.services.P2P().SelfConnectionUris() - - nodeList := make([]P2PNodeResponse, len(uris)) - - for i, uri := range uris { - nodeList[i] = P2PNodeResponse{ - Id: localId, - Uris: []string{uri.String()}, - } - } - - ctx.Encode(P2PNodesResponse{ - Nodes: nodeList, - }) + Service } diff --git a/service/p2p.go b/service/p2p.go index 2125e37..a4826df 100644 --- a/service/p2p.go +++ b/service/p2p.go @@ -1,41 +1,15 @@ package service import ( - "bytes" - "context" - "errors" - "fmt" - "git.lumeweb.com/LumeWeb/libs5-go/ed25519" "git.lumeweb.com/LumeWeb/libs5-go/encoding" "git.lumeweb.com/LumeWeb/libs5-go/net" - "git.lumeweb.com/LumeWeb/libs5-go/protocol" - "git.lumeweb.com/LumeWeb/libs5-go/protocol/base" - "git.lumeweb.com/LumeWeb/libs5-go/protocol/signed" - "git.lumeweb.com/LumeWeb/libs5-go/storage" "git.lumeweb.com/LumeWeb/libs5-go/structs" "git.lumeweb.com/LumeWeb/libs5-go/types" - "git.lumeweb.com/LumeWeb/libs5-go/utils" - "github.com/vmihailenco/msgpack/v5" - bolt "go.etcd.io/bbolt" - "go.uber.org/zap" "net/url" - "sort" "sync" - "time" ) -var _ Service = (*P2PService)(nil) -var _ storage.StorageLocationProviderP2PService = (*P2PService)(nil) -var _ P2PServiceInterface = (*P2PService)(nil) - -var ( - errUnsupportedProtocol = errors.New("unsupported protocol") - errConnectionIdMissingNodeID = errors.New("connection id missing node id") -) - -const nodeBucketName = "nodes" - -type P2PServiceInterface interface { +type P2PService interface { SelfConnectionUris() []*url.URL Peers() structs.Map ConnectToNode(connectionUris []*url.URL, retried bool, fromPeer net.Peer) error @@ -53,650 +27,5 @@ type P2PServiceInterface interface { ConnectionTracker() *sync.WaitGroup NetworkId() string HashQueryRoutingTable() structs.Map -} - -type P2PService struct { - nodeKeyPair *ed25519.KeyPairEd25519 - localNodeID *encoding.NodeId - networkID string - nodesBucket *bolt.Bucket - inited bool - reconnectDelay structs.Map - peers structs.Map - peersPending structs.Map - selfConnectionUris []*url.URL - outgoingPeerBlocklist structs.Map - incomingPeerBlockList structs.Map - incomingIPBlocklist structs.Map - outgoingPeerFailures structs.Map - maxOutgoingPeerFailures uint - connections sync.WaitGroup - hashQueryRoutingTable structs.Map - ServiceBase -} - -func NewP2P(params ServiceParams) *P2PService { - uri, err := url.Parse(fmt.Sprintf("wss://%s:%d/s5/p2p", params.Config.HTTP.API.Domain, params.Config.HTTP.API.Port)) - if err != nil { - params.Logger.Fatal("failed to parse HTTP API URL", zap.Error(err)) - } - - service := &P2PService{ - nodeKeyPair: params.Config.KeyPair, - networkID: params.Config.P2P.Network, - inited: false, - reconnectDelay: structs.NewMap(), - peers: structs.NewMap(), - peersPending: structs.NewMap(), - selfConnectionUris: []*url.URL{uri}, - outgoingPeerBlocklist: structs.NewMap(), - incomingPeerBlockList: structs.NewMap(), - incomingIPBlocklist: structs.NewMap(), - outgoingPeerFailures: structs.NewMap(), - maxOutgoingPeerFailures: params.Config.P2P.MaxOutgoingPeerFailures, - hashQueryRoutingTable: structs.NewMap(), - ServiceBase: ServiceBase{ - logger: params.Logger, - config: params.Config, - db: params.Db, - }, - } - - return service -} - -func (p *P2PService) SelfConnectionUris() []*url.URL { - return p.selfConnectionUris -} - -func (p *P2PService) Peers() structs.Map { - return p.peers -} - -func (p *P2PService) Start() error { - config := p.config - if len(config.P2P.Peers.Initial) > 0 { - initialPeers := config.P2P.Peers.Initial - - for _, peer := range initialPeers { - u, err := url.Parse(peer) - if err != nil { - return err - } - - peer := peer - go func() { - err := p.ConnectToNode([]*url.URL{u}, false, nil) - if err != nil { - p.logger.Error("failed to connect to initial peer", zap.Error(err), zap.String("peer", peer)) - } - }() - } - } - - return nil -} - -func (p *P2PService) Stop() error { - return nil -} - -func (p *P2PService) Init() error { - if p.inited { - return nil - } - p.localNodeID = encoding.NewNodeId(p.nodeKeyPair.PublicKey()) - - err := utils.CreateBucket(nodeBucketName, p.db) - - if err != nil { - return err - } - - p.inited = true - - return nil -} -func (p *P2PService) ConnectToNode(connectionUris []*url.URL, retried bool, fromPeer net.Peer) error { - if !p.services.IsStarted() { - return nil - } - - unsupported, _ := url.Parse("http://0.0.0.0") - unsupported.Scheme = "unsupported" - - var connectionUri *url.URL - - for _, uri := range connectionUris { - if uri.Scheme == "ws" || uri.Scheme == "wss" { - connectionUri = uri - break - } - } - - if connectionUri == nil { - for _, uri := range connectionUris { - if uri.Scheme == "tcp" { - connectionUri = uri - break - } - } - } - - if connectionUri == nil { - connectionUri = unsupported - } - - if connectionUri.Scheme == "unsupported" { - return errUnsupportedProtocol - } - - scheme := connectionUri.Scheme - - if connectionUri.User == nil { - return errConnectionIdMissingNodeID - } - - username := connectionUri.User.Username() - id, err := encoding.DecodeNodeId(username) - if err != nil { - return err - } - - idString, err := id.ToString() - if err != nil { - return err - } - - if p.peersPending.Contains(idString) || p.peers.Contains(idString) { - p.logger.Debug("already connected", zap.String("node", connectionUri.String())) - return nil - } - - if p.outgoingPeerBlocklist.Contains(idString) { - p.logger.Debug("outgoing peer is on blocklist", zap.String("node", connectionUri.String())) - - var fromPeerId string - - if fromPeer != nil { - blocked := false - if fromPeer.Id() != nil { - fromPeerId, err = fromPeer.Id().ToString() - if err != nil { - return err - } - if !p.incomingPeerBlockList.Contains(fromPeerId) { - p.incomingPeerBlockList.Put(fromPeerId, true) - blocked = true - } - } - - fromPeerIP := fromPeer.GetIP() - - if !p.incomingIPBlocklist.Contains(fromPeerIP) { - p.incomingIPBlocklist.Put(fromPeerIP, true) - blocked = true - } - err = fromPeer.EndForAbuse() - if err != nil { - return err - } - - if blocked { - p.logger.Debug("blocking peer for sending peer on blocklist", zap.String("node", connectionUri.String()), zap.String("peer", fromPeerId), zap.String("ip", fromPeerIP)) - } - } - return nil - } - - reconnectDelay := p.reconnectDelay.GetUInt(idString) - if reconnectDelay == nil { - delay := uint(1) - reconnectDelay = &delay - } - - if id.Equals(p.localNodeID) { - return nil - } - - p.logger.Debug("connect", zap.String("node", connectionUri.String())) - - socket, err := net.CreateTransportSocket(scheme, connectionUri) - if err != nil { - if retried { - p.logger.Error("failed to connect, too many retries", zap.String("node", connectionUri.String()), zap.Error(err)) - counter := uint(0) - if p.outgoingPeerFailures.Contains(idString) { - tmp := *p.outgoingPeerFailures.GetUInt(idString) - counter = tmp - } - - counter++ - - p.outgoingPeerFailures.PutUInt(idString, counter) - - if counter >= p.maxOutgoingPeerFailures { - - if fromPeer != nil { - blocked := false - var fromPeerId string - if fromPeer.Id() != nil { - fromPeerId, err = fromPeer.Id().ToString() - if err != nil { - return err - } - if !p.incomingPeerBlockList.Contains(fromPeerId) { - p.incomingPeerBlockList.Put(fromPeerId, true) - blocked = true - } - } - - fromPeerIP := fromPeer.GetIP() - if !p.incomingIPBlocklist.Contains(fromPeerIP) { - p.incomingIPBlocklist.Put(fromPeerIP, true) - blocked = true - } - err = fromPeer.EndForAbuse() - if err != nil { - return err - } - - if blocked { - p.logger.Debug("blocking peer for sending peer on blocklist", zap.String("node", connectionUri.String()), zap.String("peer", fromPeerId), zap.String("ip", fromPeerIP)) - } - } - p.outgoingPeerBlocklist.Put(idString, true) - p.logger.Debug("blocking peer for too many failures", zap.String("node", connectionUri.String())) - } - - return nil - } - retried = true - - p.logger.Error("failed to connect", zap.String("node", connectionUri.String()), zap.Error(err)) - - delay := p.reconnectDelay.GetUInt(idString) - if delay == nil { - tmp := uint(1) - delay = &tmp - } - delayDeref := *delay - p.reconnectDelay.PutUInt(idString, delayDeref*2) - - time.Sleep(time.Duration(delayDeref) * time.Second) - - return p.ConnectToNode(connectionUris, retried, fromPeer) - } - - if p.outgoingPeerFailures.Contains(idString) { - p.outgoingPeerFailures.Remove(idString) - } - - peer, err := net.CreateTransportPeer(scheme, &net.TransportPeerConfig{ - Socket: socket, - Uris: []*url.URL{connectionUri}, - }) - - if err != nil { - return err - } - - peer.SetId(id) - - p.services.P2P().ConnectionTracker().Add(1) - - peerId, err := peer.Id().ToString() - if err != nil { - return err - } - p.peersPending.Put(peerId, peer) - - go func() { - err := p.OnNewPeer(peer, true) - if err != nil && !peer.Abuser() { - p.logger.Error("peer error", zap.Error(err)) - } - p.services.P2P().ConnectionTracker().Done() - }() - - return nil - -} - -func (p *P2PService) OnNewPeer(peer net.Peer, verifyId bool) error { - var wg sync.WaitGroup - - var pid string - - if peer.Id() != nil { - pid, _ = peer.Id().ToString() - } else { - pid = "unknown" - } - - pip := peer.GetIP() - - if p.incomingIPBlocklist.Contains(pid) { - p.logger.Error("peer is on identity blocklist", zap.String("peer", pid)) - err := peer.EndForAbuse() - if err != nil { - return err - } - return nil - } - if p.incomingPeerBlockList.Contains(pip) { - p.logger.Debug("peer is on ip blocklist", zap.String("peer", pid), zap.String("ip", pip)) - err := peer.EndForAbuse() - if err != nil { - return err - } - return nil - } - - p.logger.Debug("OnNewPeer started", zap.String("peer", pid)) - - challenge := protocol.GenerateChallenge() - peer.SetChallenge(challenge) - - wg.Add(1) - go func() { - defer wg.Done() - p.OnNewPeerListen(peer, verifyId) - }() - - handshakeOpenMsg, err := msgpack.Marshal(protocol.NewHandshakeOpen(challenge, p.networkID)) - if err != nil { - return err - } - - err = peer.SendMessage(handshakeOpenMsg) - if err != nil { - return err - } - p.logger.Debug("OnNewPeer sent handshake", zap.String("peer", pid)) - - p.logger.Debug("OnNewPeer before Wait", zap.String("peer", pid)) - wg.Wait() // Wait for OnNewPeerListen goroutine to finish - p.logger.Debug("OnNewPeer ended", zap.String("peer", pid)) - return nil -} -func (p *P2PService) OnNewPeerListen(peer net.Peer, verifyId bool) { - onDone := net.CloseCallback(func() { - if peer.Id() != nil { - pid, err := peer.Id().ToString() - if err != nil { - p.logger.Error("failed to get peer id", zap.Error(err)) - return - } - // Handle closure of the connection - if p.peers.Contains(pid) { - p.peers.Remove(pid) - } - if p.peersPending.Contains(pid) { - p.peersPending.Remove(pid) - } - } - }) - - onError := net.ErrorCallback(func(args ...interface{}) { - if !peer.Abuser() { - p.logger.Error("peer error", zap.Any("args", args)) - } - }) - - peer.ListenForMessages(func(message []byte) error { - var reader base.IncomingMessageReader - - err := msgpack.Unmarshal(message, &reader) - if err != nil { - p.logger.Error("Error decoding basic message info", zap.Error(err)) - return err - } - - // Now, get the specific message handler based on the message kind - handler, ok := protocol.GetMessageType(reader.Kind) - if !ok { - p.logger.Error("Unknown message type", zap.Int("type", reader.Kind)) - return fmt.Errorf("unknown message type: %d", reader.Kind) - } - - data := base.IncomingMessageData{ - Original: message, - Data: reader.Data, - Ctx: context.Background(), - Peer: peer, - VerifyId: verifyId, - Config: p.config, - } - - dec := msgpack.NewDecoder(bytes.NewReader(reader.Data)) - - err = handler.DecodeMessage(dec, data) - if err != nil { - p.logger.Error("Error decoding message", zap.Error(err)) - return err - } - - // Directly decode and handle the specific message type - if err := handler.HandleMessage(data); err != nil { - p.logger.Error("Error handling message", zap.Error(err)) - return err - } - - return nil - }, net.ListenerOptions{ - OnClose: &onDone, - OnError: &onError, - Logger: p.logger, - }) -} - -func (p *P2PService) readNodeVotes(nodeId *encoding.NodeId) (NodeVotes, error) { - var value []byte - var found bool - err := p.db.View(func(tx *bolt.Tx) error { - b := tx.Bucket([]byte(nodeBucketName)) - if b == nil { - return fmt.Errorf("Bucket %s not found", nodeBucketName) - } - value = b.Get(nodeId.Raw()) - if value == nil { - return nil - } - found = true - return nil - }) - if err != nil { - return nil, err - } - if !found { - return NewNodeVotes(), nil - } - - var score NodeVotes - err = msgpack.Unmarshal(value, &score) - if err != nil { - return nil, err - } - - return score, nil -} - -func (p *P2PService) saveNodeVotes(nodeId *encoding.NodeId, votes NodeVotes) error { - // Marshal the votes into data - data, err := msgpack.Marshal(votes) - if err != nil { - return err - } - - // Use a transaction to save the data - err = p.db.Update(func(tx *bolt.Tx) error { - // Get or create the bucket - b := tx.Bucket([]byte(nodeBucketName)) - - // Put the data into the bucket - return b.Put(nodeId.Raw(), data) - }) - - return err -} - -func (p *P2PService) GetNodeScore(nodeId *encoding.NodeId) (float64, error) { - if nodeId.Equals(p.localNodeID) { - return 1, nil - } - - score, err := p.readNodeVotes(nodeId) - if err != nil { - return 0.5, err - } - - return protocol.CalculateNodeScore(score.Good(), score.Bad()), nil - -} -func (p *P2PService) SortNodesByScore(nodes []*encoding.NodeId) ([]*encoding.NodeId, error) { - scores := make(map[encoding.NodeIdCode]float64) - var errOccurred error - - for _, nodeId := range nodes { - score, err := p.GetNodeScore(nodeId) - if err != nil { - errOccurred = err - scores[nodeId.HashCode()] = 0 // You may choose a different default value for error cases - } else { - scores[nodeId.HashCode()] = score - } - } - - sort.Slice(nodes, func(i, j int) bool { - return scores[nodes[i].HashCode()] > scores[nodes[j].HashCode()] - }) - - return nodes, errOccurred -} -func (p *P2PService) SignMessageSimple(message []byte) ([]byte, error) { - signedMessage := signed.NewSignedMessageRequest(message) - signedMessage.SetNodeId(p.localNodeID) - - err := signedMessage.Sign(p.config) - - if err != nil { - return nil, err - } - - result, err := msgpack.Marshal(signedMessage) - - if err != nil { - return nil, err - } - - return result, nil -} - -func (p *P2PService) AddPeer(peer net.Peer) error { - peerId, err := peer.Id().ToString() - if err != nil { - return err - } - p.peers.Put(peerId, peer) - p.reconnectDelay.PutUInt(peerId, 1) - - if p.peersPending.Contains(peerId) { - p.peersPending.Remove(peerId) - } - - return nil -} -func (p *P2PService) SendPublicPeersToPeer(peer net.Peer, peersToSend []net.Peer) error { - announceRequest := signed.NewAnnounceRequest(peer, peersToSend) - - message, err := msgpack.Marshal(announceRequest) - - if err != nil { - return err - } - - signedMessage, err := p.SignMessageSimple(message) - - if err != nil { - return err - } - - err = peer.SendMessage(signedMessage) - - return nil -} -func (p *P2PService) SendHashRequest(hash *encoding.Multihash, kinds []types.StorageLocationType) error { - hashRequest := protocol.NewHashRequest(hash, kinds) - message, err := msgpack.Marshal(hashRequest) - if err != nil { - return err - } - - for _, peer := range p.peers.Values() { - peerValue, ok := peer.(net.Peer) - if !ok { - p.logger.Error("failed to cast peer to net.Peer") - continue - } - err = peerValue.SendMessage(message) - } - - return nil -} - -func (p *P2PService) UpVote(nodeId *encoding.NodeId) error { - err := p.vote(nodeId, true) - if err != nil { - return err - } - - return nil -} - -func (p *P2PService) DownVote(nodeId *encoding.NodeId) error { - err := p.vote(nodeId, false) - if err != nil { - return err - } - - return nil -} - -func (p *P2PService) vote(nodeId *encoding.NodeId, upvote bool) error { - votes, err := p.readNodeVotes(nodeId) - if err != nil { - return err - } - - if upvote { - votes.Upvote() - } else { - votes.Downvote() - } - - err = p.saveNodeVotes(nodeId, votes) - if err != nil { - return err - } - - return nil -} -func (p *P2PService) NodeId() *encoding.NodeId { - return p.localNodeID -} - -func (p *P2PService) WaitOnConnectedPeers() { - p.connections.Wait() -} - -func (p *P2PService) ConnectionTracker() *sync.WaitGroup { - return &p.connections -} - -func (p *P2PService) NetworkId() string { - return p.config.P2P.Network -} -func (n *P2PService) HashQueryRoutingTable() structs.Map { - return n.hashQueryRoutingTable + Service } diff --git a/service/registry.go b/service/registry.go index f9cc8ea..d15370c 100644 --- a/service/registry.go +++ b/service/registry.go @@ -1,322 +1,15 @@ package service import ( - "errors" - "git.lumeweb.com/LumeWeb/libs5-go/encoding" "git.lumeweb.com/LumeWeb/libs5-go/net" "git.lumeweb.com/LumeWeb/libs5-go/protocol" - "git.lumeweb.com/LumeWeb/libs5-go/structs" - "git.lumeweb.com/LumeWeb/libs5-go/types" - "git.lumeweb.com/LumeWeb/libs5-go/utils" - "github.com/olebedev/emitter" - "github.com/vmihailenco/msgpack/v5" - "go.etcd.io/bbolt" - "go.uber.org/zap" - "time" ) -const registryBucketName = "registry" - -var ( - _ Service = (*RegistryService)(nil) - _ RegistryServiceInterface = (*RegistryService)(nil) -) - -type RegistryServiceInterface interface { +type RegistryService interface { Set(sre protocol.SignedRegistryEntry, trusted bool, receivedFrom net.Peer) error BroadcastEntry(sre protocol.SignedRegistryEntry, receivedFrom net.Peer) error SendRegistryRequest(pk []byte) error Get(pk []byte) (protocol.SignedRegistryEntry, error) Listen(pk []byte, cb func(sre protocol.SignedRegistryEntry)) (func(), error) -} - -type RegistryService struct { - streams structs.Map - subs structs.Map - ServiceBase -} - -func (r *RegistryService) Start() error { - return nil -} - -func (r *RegistryService) Stop() error { - return nil -} - -func (r *RegistryService) Init() error { - return utils.CreateBucket(registryBucketName, r.db) -} - -func NewRegistry(params ServiceParams) *RegistryService { - return &RegistryService{ - streams: structs.NewMap(), - subs: structs.NewMap(), - ServiceBase: ServiceBase{ - logger: params.Logger, - config: params.Config, - db: params.Db, - }, - } -} -func (r *RegistryService) Set(sre protocol.SignedRegistryEntry, trusted bool, receivedFrom net.Peer) error { - hash := encoding.NewMultihash(sre.PK()) - hashString, err := hash.ToString() - if err != nil { - return err - } - pid, err := receivedFrom.Id().ToString() - if err != nil { - return err - } - r.logger.Debug("[registry] set", zap.String("pk", hashString), zap.Uint64("revision", sre.Revision()), zap.String("receivedFrom", pid)) - - if !trusted { - if len(sre.PK()) != 33 { - return errors.New("Invalid pubkey") - } - if int(sre.PK()[0]) != int(types.HashTypeEd25519) { - return errors.New("Only ed25519 keys are supported") - } - if sre.Revision() < 0 || sre.Revision() > 281474976710656 { - return errors.New("Invalid revision") - } - if len(sre.Data()) > types.RegistryMaxDataSize { - return errors.New("Data too long") - } - - if !sre.Verify() { - return errors.New("Invalid signature found") - } - } - - existingEntry, err := r.getFromDB(sre.PK()) - if err != nil { - return err - } - - if existingEntry != nil { - if receivedFrom != nil { - if existingEntry.Revision() == sre.Revision() { - return nil - } else if existingEntry.Revision() > sre.Revision() { - updateMessage := protocol.MarshalSignedRegistryEntry(existingEntry) - err := receivedFrom.SendMessage(updateMessage) - if err != nil { - return err - } - return nil - } - } - - if existingEntry.Revision() >= sre.Revision() { - return errors.New("Revision number too low") - } - } - - key := encoding.NewMultihash(sre.PK()) - keyString, err := key.ToString() - if err != nil { - return err - } - - eventObj, ok := r.streams.Get(keyString) - if ok { - event := eventObj.(*emitter.Emitter) - go event.Emit("fire", sre) - } - - err = r.db.Update(func(txn *bbolt.Tx) error { - bucket := txn.Bucket([]byte(registryBucketName)) - err := bucket.Put(sre.PK(), protocol.MarshalSignedRegistryEntry(sre)) - if err != nil { - return err - } - return nil - }) - - if err != nil { - return err - } - - err = r.BroadcastEntry(sre, receivedFrom) - if err != nil { - return err - } - - return nil -} -func (r *RegistryService) BroadcastEntry(sre protocol.SignedRegistryEntry, receivedFrom net.Peer) error { - hash := encoding.NewMultihash(sre.PK()) - hashString, err := hash.ToString() - if err != nil { - return err - } - pid, err := receivedFrom.Id().ToString() - if err != nil { - return err - } - r.logger.Debug("[registry] broadcastEntry", zap.String("pk", hashString), zap.Uint64("revision", sre.Revision()), zap.String("receivedFrom", pid)) - updateMessage := protocol.MarshalSignedRegistryEntry(sre) - - for _, p := range r.services.P2P().Peers().Values() { - peer, ok := p.(net.Peer) - if !ok { - continue - } - if receivedFrom == nil || peer.Id().Equals(receivedFrom.Id()) { - err := peer.SendMessage(updateMessage) - if err != nil { - pid, err := peer.Id().ToString() - if err != nil { - return err - } - r.logger.Error("Failed to send registry broadcast", zap.String("peer", pid), zap.Error(err)) - return err - } - } - } - - return nil -} -func (r *RegistryService) SendRegistryRequest(pk []byte) error { - query := protocol.NewRegistryQuery(pk) - - request, err := msgpack.Marshal(query) - if err != nil { - return err - } - - // Iterate over peers and send the request - for _, peerVal := range r.services.P2P().Peers().Values() { - peer, ok := peerVal.(net.Peer) - if !ok { - continue - } - err := peer.SendMessage(request) - if err != nil { - pid, err := peer.Id().ToString() - if err != nil { - return err - } - r.logger.Error("Failed to send registry request", zap.String("peer", pid), zap.Error(err)) - return err - } - } - - return nil -} -func (r *RegistryService) Get(pk []byte) (protocol.SignedRegistryEntry, error) { - key := encoding.NewMultihash(pk) - keyString, err := key.ToString() - if err != nil { - return nil, err - } - - if r.subs.Contains(keyString) { - r.logger.Debug("[registry] get (cached)", zap.String("key", keyString)) - res, err := r.getFromDB(pk) - if err != nil { - return nil, err - } - if res != nil { - return res, nil - } - - err = r.SendRegistryRequest(pk) - if err != nil { - return nil, err - } - time.Sleep(200 * time.Millisecond) - return r.getFromDB(pk) - } - - err = r.SendRegistryRequest(pk) - if err != nil { - return nil, err - } - r.subs.Put(keyString, key) - if !r.streams.Contains(keyString) { - event := &emitter.Emitter{} - r.streams.Put(keyString, event) - } - - res, err := r.getFromDB(pk) - if err != nil { - return nil, err - } - - if res != nil { - return res, nil - } - - if res == nil { - r.logger.Debug("[registry] get (cached)", zap.String("key", keyString)) - for i := 0; i < 200; i++ { - time.Sleep(10 * time.Millisecond) - res, err := r.getFromDB(pk) - if err != nil { - return nil, err - } - if res != nil { - break - } - } - } - - return nil, nil -} - -func (r *RegistryService) Listen(pk []byte, cb func(sre protocol.SignedRegistryEntry)) (func(), error) { - key, err := encoding.NewMultihash(pk).ToString() - if err != nil { - return nil, err - } - - cbProxy := func(event *emitter.Event) { - sre, ok := event.Args[0].(protocol.SignedRegistryEntry) - if !ok { - r.logger.Error("Failed to cast event to SignedRegistryEntry") - return - } - - cb(sre) - } - - if !r.streams.Contains(key) { - em := emitter.New(0) - r.streams.Put(key, em) - err := r.SendRegistryRequest(pk) - if err != nil { - return nil, err - } - } - streamVal, _ := r.streams.Get(key) - stream := streamVal.(*emitter.Emitter) - channel := stream.On("fire", cbProxy) - - return func() { - stream.Off("fire", channel) - }, nil -} - -func (r *RegistryService) getFromDB(pk []byte) (sre protocol.SignedRegistryEntry, err error) { - err = r.db.View(func(txn *bbolt.Tx) error { - bucket := txn.Bucket([]byte(registryBucketName)) - val := bucket.Get(pk) - if val != nil { - entry, err := protocol.UnmarshalSignedRegistryEntry(val) - if err != nil { - return err - } - sre = entry - return nil - } - return nil - }) - if err != nil { - return nil, err - } - - return sre, nil + Service } diff --git a/service/service.go b/service/service.go index 2d6e86d..10d4b59 100644 --- a/service/service.go +++ b/service/service.go @@ -2,7 +2,7 @@ package service import ( "git.lumeweb.com/LumeWeb/libs5-go/config" - bolt "go.etcd.io/bbolt" + "go.etcd.io/bbolt" "go.uber.org/zap" ) @@ -11,12 +11,15 @@ type Service interface { Stop() error Init() error SetServices(services Services) + Logger() *zap.Logger + Config() *config.NodeConfig + Db() *bbolt.DB } type Services interface { - P2P() P2PServiceInterface - Registry() RegistryServiceInterface - HTTP() HTTPServiceInterface - Storage() StorageServiceInterface + P2P() P2PService + Registry() RegistryService + HTTP() HTTPService + Storage() StorageService All() []Service IsStarted() bool Start() error @@ -26,16 +29,32 @@ type Services interface { type ServiceParams struct { Logger *zap.Logger Config *config.NodeConfig - Db *bolt.DB + Db *bbolt.DB } type ServiceBase struct { logger *zap.Logger config *config.NodeConfig - db *bolt.DB + db *bbolt.DB services Services } +func NewServiceBase(logger *zap.Logger, config *config.NodeConfig, db *bbolt.DB) ServiceBase { + return ServiceBase{logger: logger, config: config, db: db} +} + func (s *ServiceBase) SetServices(services Services) { s.services = services } +func (s *ServiceBase) Services() Services { + return s.services +} +func (s *ServiceBase) Logger() *zap.Logger { + return s.logger +} +func (s *ServiceBase) Config() *config.NodeConfig { + return s.config +} +func (s *ServiceBase) Db() *bbolt.DB { + return s.db +} diff --git a/service/storage.go b/service/storage.go index 2f3533a..8f71b91 100644 --- a/service/storage.go +++ b/service/storage.go @@ -1,31 +1,13 @@ package service import ( - "errors" - "fmt" "git.lumeweb.com/LumeWeb/libs5-go/encoding" "git.lumeweb.com/LumeWeb/libs5-go/metadata" "git.lumeweb.com/LumeWeb/libs5-go/storage" - "git.lumeweb.com/LumeWeb/libs5-go/storage/provider" - "git.lumeweb.com/LumeWeb/libs5-go/structs" "git.lumeweb.com/LumeWeb/libs5-go/types" - "git.lumeweb.com/LumeWeb/libs5-go/utils" - "github.com/go-resty/resty/v2" - "github.com/vmihailenco/msgpack/v5" - "go.etcd.io/bbolt" - "go.uber.org/zap" - "time" ) -const cacheBucketName = "object-cache" - -var ( - _ Service = (*StorageService)(nil) - _ storage.StorageLocationProviderStorageService = (*StorageService)(nil) - _ StorageServiceInterface = (*StorageService)(nil) -) - -type StorageServiceInterface interface { +type StorageService interface { SetProviderStore(store storage.ProviderStore) ProviderStore() storage.ProviderStore GetCachedStorageLocations(hash *encoding.Multihash, kinds []types.StorageLocationType) (map[string]storage.StorageLocation, error) @@ -33,279 +15,5 @@ type StorageServiceInterface interface { DownloadBytesByHash(hash *encoding.Multihash) ([]byte, error) DownloadBytesByCID(cid *encoding.CID) ([]byte, error) GetMetadataByCID(cid *encoding.CID) (metadata.Metadata, error) -} - -type StorageService struct { - httpClient *resty.Client - metadataCache structs.Map - providerStore storage.ProviderStore - ServiceBase -} - -func NewStorage(params ServiceParams) *StorageService { - return &StorageService{ - httpClient: resty.New(), - metadataCache: structs.NewMap(), - ServiceBase: ServiceBase{ - logger: params.Logger, - config: params.Config, - db: params.Db, - }, - } -} - -func (s *StorageService) Start() error { - err := - utils.CreateBucket(cacheBucketName, s.db) - - if err != nil { - return err - } - - return nil -} - -func (s *StorageService) Stop() error { - return nil -} - -func (s *StorageService) Init() error { - return nil -} - -func (n *StorageService) SetProviderStore(store storage.ProviderStore) { - n.providerStore = store -} - -func (n *StorageService) ProviderStore() storage.ProviderStore { - return n.providerStore -} - -func (s *StorageService) GetCachedStorageLocations(hash *encoding.Multihash, kinds []types.StorageLocationType) (map[string]storage.StorageLocation, error) { - locations := make(map[string]storage.StorageLocation) - - locationMap, err := s.readStorageLocationsFromDB(hash) - if err != nil { - return nil, err - } - if len(locationMap) == 0 { - return make(map[string]storage.StorageLocation), nil - } - - ts := time.Now().Unix() - - for _, t := range kinds { - nodeMap, ok := (locationMap)[int(t)] - if !ok { - continue - } - - for key, value := range nodeMap { - expiry, ok := value[3].(int64) - if !ok || expiry < ts { - continue - } - - addressesInterface, ok := value[1].([]interface{}) - if !ok { - continue - } - - // Create a slice to hold the strings - addresses := make([]string, len(addressesInterface)) - - // Convert each element to string - for i, v := range addressesInterface { - str, ok := v.(string) - if !ok { - // Handle the error, maybe skip this element or set a default value - continue - } - addresses[i] = str - } - - storageLocation := storage.NewStorageLocation(int(t), addresses, expiry) - - if providerMessage, ok := value[4].([]byte); ok { - (storageLocation).SetProviderMessage(providerMessage) - } - - locations[key] = storageLocation - } - } - return locations, nil -} -func (s *StorageService) readStorageLocationsFromDB(hash *encoding.Multihash) (storage.StorageLocationMap, error) { - var locationMap storage.StorageLocationMap - - err := s.db.View(func(tx *bbolt.Tx) error { - b := tx.Bucket([]byte(cacheBucketName)) // Replace with your actual bucket name - if b == nil { - return fmt.Errorf("bucket %s not found", cacheBucketName) - } - - bytes := b.Get(hash.FullBytes()) - if bytes == nil { - // If no data found, return an empty locationMap but no error - locationMap = storage.NewStorageLocationMap() - return nil - } - - return msgpack.Unmarshal(bytes, &locationMap) - }) - - if err != nil { - return nil, err - } - - return locationMap, nil -} - -func (s *StorageService) AddStorageLocation(hash *encoding.Multihash, nodeId *encoding.NodeId, location storage.StorageLocation, message []byte) error { - // Read existing storage locations - locationDb, err := s.readStorageLocationsFromDB(hash) - if err != nil { - return err - } - - nodeIdStr, err := nodeId.ToString() - if err != nil { - return err - } - - // Get or create the inner map for the specific type - innerMap, exists := locationDb[location.Type()] - if !exists { - innerMap = make(storage.NodeStorage, 1) - innerMap[nodeIdStr] = make(storage.NodeDetailsStorage, 1) - } - - // Create location map with new data - locationMap := make(map[int]interface{}, 3) - locationMap[1] = location.Parts() - locationMap[3] = location.Expiry() - locationMap[4] = message - - // Update the inner map with the new location - innerMap[nodeIdStr] = locationMap - locationDb[location.Type()] = innerMap - - // Serialize the updated map and store it in the database - packedBytes, err := msgpack.Marshal(locationDb) - if err != nil { - return err - } - err = s.db.Update(func(tx *bbolt.Tx) error { - b := tx.Bucket([]byte(cacheBucketName)) - - return b.Put(hash.FullBytes(), packedBytes) - }) - if err != nil { - return err - } - - return nil -} - -func (s *StorageService) DownloadBytesByHash(hash *encoding.Multihash) ([]byte, error) { - // Initialize the download URI provider - dlUriProvider := provider.NewStorageLocationProvider(provider.StorageLocationProviderParams{ - Services: storage.NewStorageLocationProviderServices(s.services.P2P(), s.services.Storage()), - Hash: hash, - LocationTypes: []types.StorageLocationType{ - types.StorageLocationTypeFull, - types.StorageLocationTypeFile, - }, - Logger: s.logger, - Config: s.config, - Db: s.db, - }) - err := dlUriProvider.Start() - if err != nil { - return nil, err - } - - retryCount := 0 - for { - dlUri, err := dlUriProvider.Next() - if err != nil { - return nil, err - } - - s.logger.Debug("Trying to download from", zap.String("url", dlUri.Location().BytesURL())) - - res, err := s.httpClient.R().Get(dlUri.Location().BytesURL()) - if err != nil { - err := dlUriProvider.Downvote(dlUri) - if err != nil { - return nil, err - } - retryCount++ - if retryCount > 32 { - return nil, errors.New("too many retries") - } - continue - } - - bodyBytes := res.Body() - - return bodyBytes, nil - } -} - -func (s *StorageService) DownloadBytesByCID(cid *encoding.CID) (bytes []byte, err error) { - bytes, err = s.DownloadBytesByHash(&cid.Hash) - if err != nil { - return nil, err - } - - return bytes, nil -} - -func (s *StorageService) GetMetadataByCID(cid *encoding.CID) (md metadata.Metadata, err error) { - hashStr, err := cid.Hash.ToString() - if err != nil { - return nil, err - } - - if s.metadataCache.Contains(hashStr) { - md, _ := s.metadataCache.Get(hashStr) - - return md.(metadata.Metadata), nil - } - - bytes, err := s.DownloadBytesByHash(&cid.Hash) - if err != nil { - return nil, err - } - - switch cid.Type { - case types.CIDTypeMetadataMedia, types.CIDTypeBridge: // Both cases use the same deserialization method - md = metadata.NewEmptyMediaMetadata() - - err = msgpack.Unmarshal(bytes, md) - if err != nil { - return nil, err - } - case types.CIDTypeMetadataWebapp: - md = metadata.NewEmptyWebAppMetadata() - - err = msgpack.Unmarshal(bytes, md) - if err != nil { - return nil, err - } - case types.CIDTypeDirectory: - md = metadata.NewEmptyDirectoryMetadata() - - err = msgpack.Unmarshal(bytes, md) - if err != nil { - return nil, err - } - default: - return nil, errors.New("unsupported metadata format") - } - - s.metadataCache.Put(hashStr, md) - - return md, nil + Service } diff --git a/storage/provider/provider.go b/storage/provider/provider.go index 7556b33..4401aa5 100644 --- a/storage/provider/provider.go +++ b/storage/provider/provider.go @@ -5,6 +5,7 @@ import ( "fmt" "git.lumeweb.com/LumeWeb/libs5-go/config" "git.lumeweb.com/LumeWeb/libs5-go/encoding" + "git.lumeweb.com/LumeWeb/libs5-go/service" "git.lumeweb.com/LumeWeb/libs5-go/storage" "git.lumeweb.com/LumeWeb/libs5-go/types" bolt "go.etcd.io/bbolt" @@ -16,7 +17,7 @@ import ( var _ storage.StorageLocationProvider = (*StorageLocationProviderImpl)(nil) type StorageLocationProviderImpl struct { - services storage.StorageLocationProviderServices + services service.Services hash *encoding.Multihash types []types.StorageLocationType timeoutDuration time.Duration @@ -193,10 +194,11 @@ func containsNode(slice []*encoding.NodeId, item *encoding.NodeId) bool { } type StorageLocationProviderParams struct { - Services storage.StorageLocationProviderServices + Services service.Services Hash *encoding.Multihash LocationTypes []types.StorageLocationType Logger *zap.Logger Config *config.NodeConfig Db *bolt.DB + service.ServiceParams } diff --git a/storage/storage.go b/storage/storage.go index 0ddf8d8..8c96454 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -3,7 +3,6 @@ package storage import ( "fmt" "git.lumeweb.com/LumeWeb/libs5-go/encoding" - "git.lumeweb.com/LumeWeb/libs5-go/types" "github.com/vmihailenco/msgpack/v5" "strconv" "time" @@ -128,34 +127,3 @@ type StorageLocationProvider interface { Upvote(uri SignedStorageLocation) error Downvote(uri SignedStorageLocation) error } - -type StorageLocationProviderServices interface { - P2P() StorageLocationProviderP2PService - Storage() StorageLocationProviderStorageService -} -type StorageLocationProviderP2PService interface { - SortNodesByScore(nodes []*encoding.NodeId) ([]*encoding.NodeId, error) - SendHashRequest(hash *encoding.Multihash, kinds []types.StorageLocationType) error - UpVote(nodeId *encoding.NodeId) error - DownVote(nodeId *encoding.NodeId) error -} -type StorageLocationProviderStorageService interface { - GetCachedStorageLocations(hash *encoding.Multihash, kinds []types.StorageLocationType) (map[string]StorageLocation, error) -} - -type StorageLocationProviderServicesImpl struct { - p2p StorageLocationProviderP2PService - storage StorageLocationProviderStorageService -} - -func NewStorageLocationProviderServices(p2p StorageLocationProviderP2PService, storage StorageLocationProviderStorageService) *StorageLocationProviderServicesImpl { - return &StorageLocationProviderServicesImpl{p2p: p2p, storage: storage} -} - -func (s *StorageLocationProviderServicesImpl) P2P() StorageLocationProviderP2PService { - return s.p2p -} - -func (s *StorageLocationProviderServicesImpl) Storage() StorageLocationProviderStorageService { - return s.storage -}