feat: implement DownloadBytesByHash and GetMetadataByCID
This commit is contained in:
parent
ee20d2a560
commit
18bc518dad
121
node/node.go
121
node/node.go
|
@ -1,17 +1,21 @@
|
||||||
package node
|
package node
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"git.lumeweb.com/LumeWeb/libs5-go/config"
|
"git.lumeweb.com/LumeWeb/libs5-go/config"
|
||||||
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
|
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
|
||||||
"git.lumeweb.com/LumeWeb/libs5-go/interfaces"
|
"git.lumeweb.com/LumeWeb/libs5-go/interfaces"
|
||||||
|
"git.lumeweb.com/LumeWeb/libs5-go/metadata"
|
||||||
"git.lumeweb.com/LumeWeb/libs5-go/service"
|
"git.lumeweb.com/LumeWeb/libs5-go/service"
|
||||||
"git.lumeweb.com/LumeWeb/libs5-go/storage"
|
"git.lumeweb.com/LumeWeb/libs5-go/storage"
|
||||||
"git.lumeweb.com/LumeWeb/libs5-go/structs"
|
"git.lumeweb.com/LumeWeb/libs5-go/structs"
|
||||||
"git.lumeweb.com/LumeWeb/libs5-go/types"
|
"git.lumeweb.com/LumeWeb/libs5-go/types"
|
||||||
"git.lumeweb.com/LumeWeb/libs5-go/utils"
|
"git.lumeweb.com/LumeWeb/libs5-go/utils"
|
||||||
|
"github.com/go-resty/resty/v2"
|
||||||
"github.com/vmihailenco/msgpack/v5"
|
"github.com/vmihailenco/msgpack/v5"
|
||||||
bolt "go.etcd.io/bbolt"
|
bolt "go.etcd.io/bbolt"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
"log"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -26,6 +30,7 @@ type NodeImpl struct {
|
||||||
hashQueryRoutingTable structs.Map
|
hashQueryRoutingTable structs.Map
|
||||||
services interfaces.Services
|
services interfaces.Services
|
||||||
cacheBucket *bolt.Bucket
|
cacheBucket *bolt.Bucket
|
||||||
|
httpClient *resty.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *NodeImpl) NetworkId() string {
|
func (n *NodeImpl) NetworkId() string {
|
||||||
|
@ -42,6 +47,7 @@ func NewNode(config *config.NodeConfig) interfaces.Node {
|
||||||
metadataCache: structs.NewMap(),
|
metadataCache: structs.NewMap(),
|
||||||
started: false,
|
started: false,
|
||||||
hashQueryRoutingTable: structs.NewMap(),
|
hashQueryRoutingTable: structs.NewMap(),
|
||||||
|
httpClient: resty.New(),
|
||||||
}
|
}
|
||||||
n.services = NewServices(service.NewP2P(n))
|
n.services = NewServices(service.NewP2P(n))
|
||||||
|
|
||||||
|
@ -98,25 +104,6 @@ func (n *NodeImpl) Start() error {
|
||||||
n.started = true
|
n.started = true
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
func (n *NodeImpl) Services() *S5Services {
|
|
||||||
if n.nodeConfig != nil {
|
|
||||||
return n.nodeConfig.Services
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *NodeImpl) Start() error {
|
|
||||||
n.started = true
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *NodeImpl) Stop() error {
|
|
||||||
n.started = false
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
func (n *NodeImpl) GetCachedStorageLocations(hash *encoding.Multihash, kinds []types.StorageLocationType) (map[string]interfaces.StorageLocation, error) {
|
func (n *NodeImpl) GetCachedStorageLocations(hash *encoding.Multihash, kinds []types.StorageLocationType) (map[string]interfaces.StorageLocation, error) {
|
||||||
locations := make(map[string]interfaces.StorageLocation)
|
locations := make(map[string]interfaces.StorageLocation)
|
||||||
|
|
||||||
|
@ -220,11 +207,15 @@ func (n *NodeImpl) AddStorageLocation(hash *encoding.Multihash, nodeId *encoding
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
} /*
|
}
|
||||||
|
|
||||||
func (n *NodeImpl) DownloadBytesByHash(hash Multihash) ([]byte, error) {
|
func (n *NodeImpl) DownloadBytesByHash(hash *encoding.Multihash) ([]byte, error) {
|
||||||
dlUriProvider := NewStorageLocationProvider(n, hash, []int{storageLocationTypeFull, storageLocationTypeFile})
|
// Initialize the download URI provider
|
||||||
dlUriProvider.Start()
|
dlUriProvider := storage.NewStorageLocationProvider(n, hash)
|
||||||
|
err := dlUriProvider.Start()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
retryCount := 0
|
retryCount := 0
|
||||||
for {
|
for {
|
||||||
|
@ -233,69 +224,63 @@ func (n *NodeImpl) DownloadBytesByHash(hash Multihash) ([]byte, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
n.Logger.Verbose(fmt.Sprintf("[try] %s", dlUri.Location.BytesUrl))
|
// Log the attempt
|
||||||
|
log.Printf("[try] %s", dlUri.Location().BytesURL())
|
||||||
|
|
||||||
client := &http.Client{
|
res, err := n.httpClient.R().Get(dlUri.Location().BytesURL())
|
||||||
Timeout: 30 * time.Second,
|
|
||||||
}
|
|
||||||
res, err := client.Get(dlUri.Location.BytesUrl)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
n.Logger.Catched(err)
|
err := dlUriProvider.Downvote(dlUri)
|
||||||
|
if err != nil {
|
||||||
dlUriProvider.Downvote(dlUri)
|
return nil, err
|
||||||
|
}
|
||||||
retryCount++
|
retryCount++
|
||||||
if retryCount > 32 {
|
if retryCount > 32 {
|
||||||
return nil, errors.New("too many retries")
|
return nil, errors.New("too many retries")
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
defer res.Body.Close()
|
|
||||||
|
|
||||||
data, err := ioutil.ReadAll(res.Body)
|
bodyBytes := res.Body()
|
||||||
|
|
||||||
|
return bodyBytes, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *NodeImpl) GetMetadataByCID(cid *encoding.CID) (metadata.Metadata, error) {
|
||||||
|
var md metadata.Metadata
|
||||||
|
|
||||||
|
hashStr, err := cid.Hash.ToString()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if n.metadataCache.Contains(hashStr) {
|
||||||
|
bytes, err := n.DownloadBytesByHash(&cid.Hash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Assuming blake3 and equalBytes functions are available
|
switch cid.Type {
|
||||||
resHash := blake3(data)
|
case types.CIDTypeMetadataMedia, types.CIDTypeBridge: // Both cases use the same deserialization method
|
||||||
|
md = metadata.NewEmptyMediaMetadata()
|
||||||
|
|
||||||
if !equalBytes(hash.HashBytes, resHash) {
|
err = msgpack.Unmarshal(bytes, md)
|
||||||
dlUriProvider.Downvote(dlUri)
|
if err != nil {
|
||||||
continue
|
return nil, err
|
||||||
}
|
}
|
||||||
|
case types.CIDTypeMetadataWebapp:
|
||||||
|
md = metadata.NewEmptyWebAppMetadata()
|
||||||
|
|
||||||
dlUriProvider.Upvote(dlUri)
|
err = msgpack.Unmarshal(bytes, md)
|
||||||
return data, nil
|
if err != nil {
|
||||||
}
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *NodeImpl) GetMetadataByCID(cid CID) (Metadata, error) {
|
|
||||||
var metadata Metadata
|
|
||||||
var ok bool
|
|
||||||
|
|
||||||
if metadata, ok = n.MetadataCache[cid.Hash]; !ok {
|
|
||||||
bytes, err := n.DownloadBytesByHash(cid.Hash)
|
|
||||||
if err != nil {
|
|
||||||
return Metadata{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
switch cid.kind {
|
|
||||||
case METADATA_MEDIA, BRIDGE: // Both cases use the same deserialization method
|
|
||||||
metadata, err = deserializeMediaMetadata(bytes)
|
|
||||||
case METADATA_WEBAPP:
|
|
||||||
metadata, err = deserializeWebAppMetadata(bytes)
|
|
||||||
default:
|
default:
|
||||||
return Metadata{}, errors.New("unsupported metadata format")
|
return nil, errors.New("unsupported metadata format")
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
n.metadataCache.Put(hashStr, md)
|
||||||
return Metadata{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
n.MetadataCache[cid.Hash] = metadata
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return metadata, nil
|
return md, nil
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
|
|
Loading…
Reference in New Issue