package _default import ( "context" "errors" "fmt" "git.lumeweb.com/LumeWeb/libs5-go/encoding" "git.lumeweb.com/LumeWeb/libs5-go/metadata" "git.lumeweb.com/LumeWeb/libs5-go/service" "git.lumeweb.com/LumeWeb/libs5-go/storage" "git.lumeweb.com/LumeWeb/libs5-go/storage/provider" "git.lumeweb.com/LumeWeb/libs5-go/structs" "git.lumeweb.com/LumeWeb/libs5-go/types" "git.lumeweb.com/LumeWeb/libs5-go/utils" "github.com/ddo/rq" _ "github.com/ddo/rq" "github.com/vmihailenco/msgpack/v5" "go.etcd.io/bbolt" "go.uber.org/zap" "io" "net/http" "time" ) const cacheBucketName = "object-cache" var ( _ service.Service = (*StorageService)(nil) _ service.StorageService = (*StorageService)(nil) ) var ( ErrUnsupportedMetaFormat = errors.New("unsupported metadata format") ) type StorageService struct { metadataCache structs.Map providerStore storage.ProviderStore service.ServiceBase } func NewStorage(params service.ServiceParams) *StorageService { return &StorageService{ metadataCache: structs.NewMap(), ServiceBase: service.NewServiceBase(params.Logger, params.Config, params.Db), } } func (s *StorageService) Start(ctx context.Context) error { err := utils.CreateBucket(cacheBucketName, s.Db()) if err != nil { return err } return nil } func (s *StorageService) Stop(ctx context.Context) error { return nil } func (s *StorageService) Init(ctx context.Context) error { return nil } func (n *StorageService) SetProviderStore(store storage.ProviderStore) { n.providerStore = store } func (n *StorageService) ProviderStore() storage.ProviderStore { return n.providerStore } func (s *StorageService) GetCachedStorageLocations(hash *encoding.Multihash, kinds []types.StorageLocationType) (map[string]storage.StorageLocation, error) { locations := make(map[string]storage.StorageLocation) locationMap, err := s.readStorageLocationsFromDB(hash) if err != nil { return nil, err } local := s.getLocalStorageLocation(hash, kinds) if local != nil { nodeIDStr, err := s.Services().P2P().NodeId().ToString() if err != nil { return nil, err } locations[nodeIDStr] = local } if len(locationMap) == 0 { return locations, nil } ts := time.Now().Unix() for _, t := range kinds { nodeMap, ok := (locationMap)[int(t)] if !ok { continue } for key, value := range nodeMap { expiry, ok := value[3].(int64) if !ok || expiry < ts { continue } addressesInterface, ok := value[1].([]interface{}) if !ok { continue } // Create a slice to hold the strings addresses := make([]string, len(addressesInterface)) // Convert each element to string for i, v := range addressesInterface { str, ok := v.(string) if !ok { // Handle the error, maybe skip this element or set a default value continue } addresses[i] = str } storageLocation := storage.NewStorageLocation(int(t), addresses, expiry) if providerMessage, ok := value[4].([]byte); ok { (storageLocation).SetProviderMessage(providerMessage) } locations[key] = storageLocation } } return locations, nil } func (s *StorageService) getLocalStorageLocation(hash *encoding.Multihash, kinds []types.StorageLocationType) storage.StorageLocation { if s.ProviderStore() != nil { if s.ProviderStore().CanProvide(hash, kinds) { location, _ := s.ProviderStore().Provide(hash, kinds) message := storage.PrepareProvideMessage(s.Services().P2P().Config().KeyPair, hash, location) location.SetProviderMessage(message) return location } } return nil } func (s *StorageService) readStorageLocationsFromDB(hash *encoding.Multihash) (storage.StorageLocationMap, error) { var locationMap storage.StorageLocationMap err := s.Db().View(func(tx *bbolt.Tx) error { b := tx.Bucket([]byte(cacheBucketName)) // Replace with your actual bucket name if b == nil { return fmt.Errorf("bucket %s not found", cacheBucketName) } bytes := b.Get(hash.FullBytes()) if bytes == nil { // If no data found, return an empty locationMap but no error locationMap = storage.NewStorageLocationMap() return nil } return msgpack.Unmarshal(bytes, &locationMap) }) if err != nil { return nil, err } return locationMap, nil } func (s *StorageService) AddStorageLocation(hash *encoding.Multihash, nodeId *encoding.NodeId, location storage.StorageLocation, message []byte) error { // Read existing storage locations locationDb, err := s.readStorageLocationsFromDB(hash) if err != nil { return err } nodeIdStr, err := nodeId.ToString() if err != nil { return err } // Get or create the inner map for the specific type innerMap, exists := locationDb[location.Type()] if !exists { innerMap = make(storage.NodeStorage, 1) innerMap[nodeIdStr] = make(storage.NodeDetailsStorage, 1) } // Create location map with new data locationMap := make(map[int]interface{}, 3) locationMap[1] = location.Parts() locationMap[3] = location.Expiry() locationMap[4] = message // Update the inner map with the new location innerMap[nodeIdStr] = locationMap locationDb[location.Type()] = innerMap // Serialize the updated map and store it in the database packedBytes, err := msgpack.Marshal(locationDb) if err != nil { return err } err = s.Db().Update(func(tx *bbolt.Tx) error { b := tx.Bucket([]byte(cacheBucketName)) return b.Put(hash.FullBytes(), packedBytes) }) if err != nil { return err } return nil } func (s *StorageService) DownloadBytesByHash(hash *encoding.Multihash) ([]byte, error) { // Initialize the download URI provider dlUriProvider := provider.NewStorageLocationProvider(provider.StorageLocationProviderParams{ Services: s.Services(), Hash: hash, LocationTypes: []types.StorageLocationType{ types.StorageLocationTypeFull, types.StorageLocationTypeFile, }, ServiceParams: service.ServiceParams{ Logger: s.Logger(), Config: s.Config(), Db: s.Db(), }, }) err := dlUriProvider.Start() if err != nil { return nil, err } retryCount := 0 for { dlUri, err := dlUriProvider.Next() if err != nil { return nil, err } s.Logger().Debug("Trying to download from", zap.String("url", dlUri.Location().BytesURL())) req := rq.Get(dlUri.Location().BytesURL()) httpReq, err := req.ParseRequest() if err != nil { return nil, err } res, err := http.DefaultClient.Do(httpReq) if err != nil { err := dlUriProvider.Downvote(dlUri) if err != nil { return nil, err } retryCount++ if retryCount > 32 { return nil, errors.New("too many retries") } continue } defer func(Body io.ReadCloser) { err := Body.Close() if err != nil { s.Logger().Error("error closing body", zap.Error(err)) } }(res.Body) if res.StatusCode != 200 { err := dlUriProvider.Downvote(dlUri) if err != nil { return nil, err } continue } bodyBytes, err := io.ReadAll(res.Body) if err != nil { return nil, err } return bodyBytes, nil } } func (s *StorageService) DownloadBytesByCID(cid *encoding.CID) (bytes []byte, err error) { bytes, err = s.DownloadBytesByHash(&cid.Hash) if err != nil { return nil, err } return bytes, nil } func (s *StorageService) GetMetadataByCID(cid *encoding.CID) (md metadata.Metadata, err error) { hashStr, err := cid.Hash.ToString() if err != nil { return nil, err } if s.metadataCache.Contains(hashStr) { md, _ := s.metadataCache.Get(hashStr) return md.(metadata.Metadata), nil } bytes, err := s.DownloadBytesByHash(&cid.Hash) if err != nil { return nil, err } switch cid.Type { case types.CIDTypeMetadataMedia, types.CIDTypeBridge: // Both cases use the same deserialization method md = metadata.NewEmptyMediaMetadata() err = msgpack.Unmarshal(bytes, md) if err != nil { return nil, err } case types.CIDTypeMetadataWebapp: md = metadata.NewEmptyWebAppMetadata() err = msgpack.Unmarshal(bytes, md) if err != nil { return nil, err } case types.CIDTypeDirectory: md = metadata.NewEmptyDirectoryMetadata() err = msgpack.Unmarshal(bytes, md) if err != nil { return nil, err } default: return nil, ErrUnsupportedMetaFormat } s.metadataCache.Put(hashStr, md) return md, nil }