feat: add support for recursively pinning a manifest and all its children

This commit is contained in:
Derrick Hammer 2024-02-29 12:02:49 -05:00
parent 4126c06cd8
commit b3df326980
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
1 changed files with 236 additions and 89 deletions

View File

@ -15,6 +15,7 @@ import (
"mime/multipart" "mime/multipart"
"net/http" "net/http"
"net/url" "net/url"
"slices"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@ -793,6 +794,102 @@ func (s *S5API) accountPinDelete(jc jape.Context) {
jc.ResponseWriter.WriteHeader(http.StatusNoContent) jc.ResponseWriter.WriteHeader(http.StatusNoContent)
} }
func (s *S5API) getManifestCids(cid *encoding.CID) ([]*encoding.CID, error) {
var cids []*encoding.CID
manifest, err := s.getNode().Services().Storage().GetMetadataByCID(cid)
if err != nil {
return nil, err
}
cids = append(cids, cid)
switch cid.Type {
case types.CIDTypeMetadataMedia:
media := manifest.(*s5libmetadata.MediaMetadata)
for _, mediaType := range media.MediaTypes {
lo.ForEach(mediaType, func(format s5libmetadata.MediaFormat, _i int) {
cids = append(cids, format.Cid)
})
}
case types.CIDTypeDirectory:
dir := manifest.(*s5libmetadata.DirectoryMetadata)
lo.ForEach(lo.Values(dir.Directories.Items()), func(d *s5libmetadata.DirectoryReference, _i int) {
entry, err := s.getNode().Services().Registry().Get(d.PublicKey)
if err != nil {
s.logger.Error("Error getting registry entry", zap.Error(err))
return
}
cid, err := encoding.CIDFromRegistry(entry.Data())
if err != nil {
s.logger.Error("Error getting CID from registry entry", zap.Error(err))
return
}
childCids, err := s.getManifestCids(cid)
if err != nil {
s.logger.Error("Error getting child manifest CIDs", zap.Error(err))
return
}
cids = append(cids, childCids...)
})
lo.ForEach(lo.Values(dir.Files.Items()), func(f *s5libmetadata.FileReference, _i int) {
cids = append(cids, f.File.CID())
})
case types.CIDTypeMetadataWebapp:
webapp := manifest.(*s5libmetadata.WebAppMetadata)
lo.ForEach(lo.Values(webapp.Paths), func(f s5libmetadata.WebAppMetadataFileReference, _i int) {
cids = append(cids, f.Cid)
})
}
return cids, nil
}
func (s *S5API) accountPinManifest(jc jape.Context, userId uint, cid *encoding.CID) {
type pinResult struct {
success bool
error error
}
cids, err := s.getManifestCids(cid)
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyInvalidOperation, err))
return
}
var results map[string]pinResult
lo.ForEach(cids, func(c *encoding.CID, _i int) {
ret := pinResult{
success: true,
error: nil,
}
err := s.pinEntity(jc.Request.Context(), userId, c)
if err != nil {
s.logger.Error("Error pinning entity", zap.Error(err))
ret.success = false
ret.error = err
}
b64, err := c.ToBase64Url()
if err != nil {
s.logger.Error("Error encoding CID to base64", zap.Error(err))
return
}
results[b64] = ret
})
jc.Encode(&results)
}
func (s *S5API) accountPin(jc jape.Context) { func (s *S5API) accountPin(jc jape.Context) {
var cid string var cid string
if err := jc.DecodeParam("cid", &cid); err != nil { if err := jc.DecodeParam("cid", &cid); err != nil {
@ -818,102 +915,141 @@ func (s *S5API) accountPin(jc jape.Context) {
} }
if !found { if !found {
dlUriProvider := s.newStorageLocationProvider(&decodedCid.Hash, true, types.StorageLocationTypeFull, types.StorageLocationTypeFile) if isCidManifest(decodedCid) {
s.accountPinManifest(jc, userID, decodedCid)
err = dlUriProvider.Start()
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyResourceNotFound, err))
return return
} }
if jc.Check("error starting search", err) != nil {
return
}
locations, err := dlUriProvider.All()
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyResourceNotFound, err))
return
}
locations = lo.FilterMap(locations, func(location storage2.SignedStorageLocation, index int) (storage2.SignedStorageLocation, bool) {
r := rq.Get(location.Location().BytesURL())
httpReq, err := r.ParseRequest()
if err != nil {
return nil, false
}
res, err := http.DefaultClient.Do(httpReq)
if err != nil {
return nil, false
}
defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {
s.logger.Error("Error closing response body", zap.Error(err))
}
}(res.Body)
contentLengthStr := res.Header.Get("Content-Length")
if contentLengthStr == "" {
return nil, false
}
contentLength, err := strconv.ParseInt(contentLengthStr, 10, 64)
if err != nil {
return nil, false
}
if uint64(contentLength) != decodedCid.Size {
return nil, false
}
return location, true
})
if len(locations) == 0 {
s.sendErrorResponse(jc, NewS5Error(ErrKeyResourceNotFound, fmt.Errorf("CID could not be found on the network")))
return
}
location := locations[0]
cid64, err := decodedCid.ToBase64Url()
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyInternalError, err))
return
}
jobName := fmt.Sprintf("pin-import-%s", cid64)
if job := s.cron.GetJobByName(jobName); job == nil {
job := s.cron.RetryableJob(
cron.RetryableJobParams{
Name: jobName,
Tags: nil,
Function: s.pinImportCronJob,
Args: []interface{}{cid64, location.Location().BytesURL(), location.Location().OutboardBytesURL(), userID},
Attempt: 0,
Limit: 10,
After: nil,
Error: nil,
},
)
_, err = s.cron.CreateJob(job)
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyInternalError, err))
return
}
}
} }
jc.ResponseWriter.WriteHeader(http.StatusNoContent) jc.ResponseWriter.WriteHeader(http.StatusNoContent)
} }
func (s *S5API) pinEntity(ctx context.Context, userId uint, cid *encoding.CID) error {
found := true
if err := s.accounts.PinByHash(cid.Hash.HashBytes(), userId); err != nil {
if !errors.Is(err, gorm.ErrRecordNotFound) {
return err
}
found = false
}
if found {
return nil
}
dlUriProvider := s.newStorageLocationProvider(&cid.Hash, true, types.StorageLocationTypeFull, types.StorageLocationTypeFile)
err := dlUriProvider.Start()
if err != nil {
return err
}
locations, err := dlUriProvider.All()
if err != nil {
return err
}
locations = lo.FilterMap(locations, func(location storage2.SignedStorageLocation, index int) (storage2.SignedStorageLocation, bool) {
r := rq.Get(location.Location().BytesURL())
httpReq, err := r.ParseRequest()
if err != nil {
return nil, false
}
res, err := http.DefaultClient.Do(httpReq)
if err != nil {
err = dlUriProvider.Downvote(location)
if err != nil {
s.logger.Error("Error downvoting location", zap.Error(err))
return nil, false
}
return nil, false
}
defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {
s.logger.Error("Error closing response body", zap.Error(err))
}
}(res.Body)
contentLengthStr := res.Header.Get("Content-Length")
if contentLengthStr == "" {
err = dlUriProvider.Downvote(location)
if err != nil {
s.logger.Error("Error downvoting location", zap.Error(err))
return nil, false
}
return nil, false
}
contentLength, err := strconv.ParseInt(contentLengthStr, 10, 64)
if err != nil {
return nil, false
}
if !isCidManifest(cid) {
if uint64(contentLength) != cid.Size {
return nil, false
}
} else {
data, err := io.ReadAll(res.Body)
if err != nil {
return nil, false
}
proof, err := s.storage.HashObject(ctx, bytes.NewReader(data))
if err != nil {
return nil, false
}
if !bytes.Equal(proof.Hash, cid.Hash.HashBytes()) {
return nil, false
}
}
return location, true
})
if len(locations) == 0 {
return fmt.Errorf("CID could not be found on the network")
}
location := locations[0]
cid64, err := cid.ToBase64Url()
if err != nil {
return nil
}
jobName := fmt.Sprintf("pin-import-%s", cid64)
if job := s.cron.GetJobByName(jobName); job == nil {
job := s.cron.RetryableJob(
cron.RetryableJobParams{
Name: jobName,
Tags: nil,
Function: s.pinImportCronJob,
Args: []interface{}{cid64, location.Location().BytesURL(), location.Location().OutboardBytesURL(), userId},
Attempt: 0,
Limit: 10,
After: nil,
Error: nil,
},
)
_, err = s.cron.CreateJob(job)
if err != nil {
return nil
}
}
return nil
}
func (s *S5API) directoryUpload(jc jape.Context) { func (s *S5API) directoryUpload(jc jape.Context) {
// Decode form fields // Decode form fields
var ( var (
@ -1198,6 +1334,7 @@ func (s *S5API) registrySet(jc jape.Context) {
return return
} }
} }
func (s *S5API) registrySubscription(jc jape.Context) { func (s *S5API) registrySubscription(jc jape.Context) {
// Create a context for the WebSocket operations // Create a context for the WebSocket operations
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -1747,6 +1884,16 @@ func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId
return nil return nil
} }
func isCidManifest(cid *encoding.CID) bool {
mTypes := []types.CIDType{
types.CIDTypeMetadataMedia,
types.CIDTypeMetadataWebapp,
types.CIDTypeUserIdentity,
}
return slices.Contains(mTypes, cid.Type)
}
func setAuthCookie(jwt string, jc jape.Context) { func setAuthCookie(jwt string, jc jape.Context) {
authCookie := http.Cookie{ authCookie := http.Cookie{
Name: "s5-auth-token", Name: "s5-auth-token",