feat: add ability for pinning to import a CID via cron task
This commit is contained in:
parent
0010d6c5b9
commit
5c3d1144d4
279
api/s5/s5.go
279
api/s5/s5.go
|
@ -10,6 +10,10 @@ import (
|
|||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"git.lumeweb.com/LumeWeb/portal/bao"
|
||||
"git.lumeweb.com/LumeWeb/portal/renter"
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/service/s3"
|
||||
"io"
|
||||
"math"
|
||||
"mime/multipart"
|
||||
|
@ -19,6 +23,8 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"git.lumeweb.com/LumeWeb/portal/cron"
|
||||
|
||||
"git.lumeweb.com/LumeWeb/portal/config"
|
||||
|
||||
"git.lumeweb.com/LumeWeb/portal/api/swagger"
|
||||
|
@ -49,6 +55,7 @@ import (
|
|||
"git.lumeweb.com/LumeWeb/portal/api/registry"
|
||||
protoRegistry "git.lumeweb.com/LumeWeb/portal/protocols/registry"
|
||||
"git.lumeweb.com/LumeWeb/portal/protocols/s5"
|
||||
"github.com/ddo/rq"
|
||||
"github.com/rs/cors"
|
||||
"go.sia.tech/jape"
|
||||
"go.uber.org/fx"
|
||||
|
@ -72,6 +79,7 @@ type S5API struct {
|
|||
protocol *s5.S5Protocol
|
||||
logger *zap.Logger
|
||||
tusHandler *s5.TusHandler
|
||||
cron *cron.CronServiceDefault
|
||||
}
|
||||
|
||||
type APIParams struct {
|
||||
|
@ -85,6 +93,7 @@ type APIParams struct {
|
|||
Protocols []protoRegistry.Protocol `group:"protocol"`
|
||||
Logger *zap.Logger
|
||||
TusHandler *s5.TusHandler
|
||||
Cron *cron.CronServiceDefault
|
||||
}
|
||||
|
||||
type S5ApiResult struct {
|
||||
|
@ -104,6 +113,7 @@ func NewS5(params APIParams) (S5ApiResult, error) {
|
|||
protocols: params.Protocols,
|
||||
logger: params.Logger,
|
||||
tusHandler: params.TusHandler,
|
||||
cron: params.Cron,
|
||||
}
|
||||
return S5ApiResult{
|
||||
API: api,
|
||||
|
@ -720,9 +730,101 @@ func (s *S5API) accountPin(jc jape.Context) {
|
|||
return
|
||||
}
|
||||
|
||||
found := true
|
||||
|
||||
if err := s.accounts.PinByHash(decodedCid.Hash.HashBytes(), userID); err != nil {
|
||||
s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, err))
|
||||
return
|
||||
if !errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, err))
|
||||
return
|
||||
}
|
||||
found = false
|
||||
}
|
||||
|
||||
if !found {
|
||||
dlUriProvider := s.newStorageLocationProvider(&decodedCid.Hash, types.StorageLocationTypeFull, types.StorageLocationTypeFile)
|
||||
|
||||
err = dlUriProvider.Start()
|
||||
|
||||
if err != nil {
|
||||
s.sendErrorResponse(jc, NewS5Error(ErrKeyResourceNotFound, err))
|
||||
return
|
||||
}
|
||||
|
||||
if jc.Check("error starting search", err) != nil {
|
||||
return
|
||||
}
|
||||
|
||||
next, err := dlUriProvider.Next()
|
||||
if err != nil {
|
||||
s.sendErrorResponse(jc, NewS5Error(ErrKeyResourceNotFound, err))
|
||||
return
|
||||
}
|
||||
|
||||
r := rq.Head(next.Location().BytesURL())
|
||||
httpReq, err := r.ParseRequest()
|
||||
|
||||
if err != nil {
|
||||
s.sendErrorResponse(jc, NewS5Error(ErrKeyInternalError, err))
|
||||
return
|
||||
}
|
||||
|
||||
res, err := http.DefaultClient.Do(httpReq)
|
||||
|
||||
if err != nil {
|
||||
s.sendErrorResponse(jc, NewS5Error(ErrKeyFileDownloadFailed, err))
|
||||
}
|
||||
defer func(Body io.ReadCloser) {
|
||||
err := Body.Close()
|
||||
if err != nil {
|
||||
s.logger.Error("Error closing response body", zap.Error(err))
|
||||
}
|
||||
}(res.Body)
|
||||
|
||||
contentLengthStr := res.Header.Get("Content-Length")
|
||||
if contentLengthStr == "" {
|
||||
s.sendErrorResponse(jc, NewS5Error(ErrKeyFileDownloadFailed, errors.New("content-length header is missing")))
|
||||
return
|
||||
}
|
||||
|
||||
contentLength, err := strconv.ParseInt(contentLengthStr, 10, 64)
|
||||
if err != nil {
|
||||
s.sendErrorResponse(jc, NewS5Error(ErrKeyFileDownloadFailed, err))
|
||||
return
|
||||
}
|
||||
|
||||
if uint64(contentLength) != decodedCid.Size {
|
||||
s.sendErrorResponse(jc, NewS5Error(ErrKeyFileDownloadFailed, errors.New("file size does not match CID expected size")))
|
||||
return
|
||||
}
|
||||
|
||||
cid64, err := decodedCid.ToBase64Url()
|
||||
if err != nil {
|
||||
s.sendErrorResponse(jc, NewS5Error(ErrKeyInternalError, err))
|
||||
return
|
||||
}
|
||||
|
||||
jobName := fmt.Sprintf("pin-import-%s", cid64)
|
||||
|
||||
if task := s.cron.GetJobByName(jobName); task == nil {
|
||||
task := s.cron.RetryableTask(
|
||||
cron.RetryableTaskParams{
|
||||
Name: jobName,
|
||||
Tags: nil,
|
||||
Function: s.pinImportCronTask,
|
||||
Args: []interface{}{cid64, next.Location().BytesURL(), next.Location().OutboardBytesURL(), userID},
|
||||
Attempt: 0,
|
||||
Limit: 10,
|
||||
After: nil,
|
||||
Error: nil,
|
||||
},
|
||||
)
|
||||
|
||||
_, err = s.cron.CreateJob(task)
|
||||
if err != nil {
|
||||
s.sendErrorResponse(jc, NewS5Error(ErrKeyInternalError, err))
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
jc.ResponseWriter.WriteHeader(http.StatusNoContent)
|
||||
|
@ -1370,6 +1472,179 @@ func (s *S5API) newFile(protocol *s5.S5Protocol, hash []byte) *S5File {
|
|||
})
|
||||
}
|
||||
|
||||
func (s *S5API) pinImportCronTask(cid string, url string, proofUrl string, userId uint) error {
|
||||
ctx := context.Background()
|
||||
|
||||
// Parse CID early to avoid unnecessary operations if it fails.
|
||||
parsedCid, err := encoding.CIDFromString(cid)
|
||||
if err != nil {
|
||||
s.logger.Error("error parsing cid", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
// Function to streamline error handling and closing of response body.
|
||||
closeBody := func(body io.ReadCloser) {
|
||||
if err := body.Close(); err != nil {
|
||||
s.logger.Error("error closing response body", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
// Inline fetching and reading body, directly incorporating your checks.
|
||||
fetchAndProcess := func(fetchUrl string) ([]byte, error) {
|
||||
req, err := rq.Get(fetchUrl).ParseRequest()
|
||||
if err != nil {
|
||||
s.logger.Error("error parsing request", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
s.logger.Error("error executing request", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
defer closeBody(res.Body)
|
||||
|
||||
if res.StatusCode != http.StatusOK {
|
||||
errMsg := "error fetching URL: " + fetchUrl
|
||||
s.logger.Error(errMsg, zap.String("status", res.Status))
|
||||
return nil, fmt.Errorf(errMsg+" with status: %s", res.Status)
|
||||
}
|
||||
|
||||
data, err := io.ReadAll(res.Body)
|
||||
if err != nil {
|
||||
s.logger.Error("error reading response body", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
return data, nil
|
||||
}
|
||||
|
||||
saveAndPin := func(upload *metadata.UploadMetadata) error {
|
||||
upload.UserID = userId
|
||||
if err := s.metadata.SaveUpload(ctx, *upload); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.accounts.PinByHash(parsedCid.Hash.HashBytes(), userId); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Fetch proof.
|
||||
proof, err := fetchAndProcess(proofUrl)
|
||||
if err != nil {
|
||||
return err // Error logged in fetchAndProcess
|
||||
}
|
||||
|
||||
// Fetch file and process if under post upload limit.
|
||||
if parsedCid.Size <= s.config.Config().Core.PostUploadLimit {
|
||||
fileData, err := fetchAndProcess(url)
|
||||
if err != nil {
|
||||
return err // Error logged in fetchAndProcess
|
||||
}
|
||||
|
||||
hash, err := s.storage.HashObject(ctx, bytes.NewReader(fileData))
|
||||
if err != nil {
|
||||
s.logger.Error("error hashing object", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
if !bytes.Equal(hash.Hash, parsedCid.Hash.HashBytes()) {
|
||||
return fmt.Errorf("hash mismatch")
|
||||
}
|
||||
|
||||
upload, err := s.storage.UploadObject(ctx, s5.GetStorageProtocol(s.protocol), bytes.NewReader(fileData), nil, hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = saveAndPin(upload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
baoProof := bao.Result{
|
||||
Hash: parsedCid.Hash.HashBytes(),
|
||||
Proof: proof,
|
||||
Length: uint(parsedCid.Size),
|
||||
}
|
||||
|
||||
client, err := s.storage.S3Client(ctx)
|
||||
if err != nil {
|
||||
s.logger.Error("error getting s3 client", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
req, err := rq.Get(url).ParseRequest()
|
||||
if err != nil {
|
||||
s.logger.Error("error parsing request", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
res, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
s.logger.Error("error executing request", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
defer closeBody(res.Body)
|
||||
|
||||
verifier := bao.NewVerifier(res.Body, baoProof)
|
||||
defer func(Body io.ReadCloser) {
|
||||
err := Body.Close()
|
||||
if err != nil {
|
||||
s.logger.Error("error closing verifier stream", zap.Error(err))
|
||||
}
|
||||
|
||||
}(verifier)
|
||||
|
||||
_, err = client.PutObject(ctx, &s3.PutObjectInput{
|
||||
Bucket: aws.String(s.config.Config().Core.Storage.S3.BufferBucket),
|
||||
Key: aws.String(cid),
|
||||
Body: verifier,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
upload, err := s.storage.UploadObject(ctx, s5.GetStorageProtocol(s.protocol), nil, &renter.MultiPartUploadParams{
|
||||
ReaderFactory: func(start uint, end uint) (io.ReadCloser, error) {
|
||||
rangeHeader := fmt.Sprintf("bytes=%d-%d", start, end)
|
||||
object, err := client.GetObject(ctx, &s3.GetObjectInput{
|
||||
Bucket: aws.String(s.config.Config().Core.Storage.S3.BufferBucket),
|
||||
Key: aws.String(cid),
|
||||
Range: aws.String(rangeHeader),
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return object.Body, nil
|
||||
},
|
||||
Bucket: s.config.Config().Core.Storage.S3.BufferBucket,
|
||||
FileName: s5.GetStorageProtocol(s.protocol).EncodeFileName(parsedCid.Hash.HashBytes()),
|
||||
Size: parsedCid.Size,
|
||||
ExistingUploadID: "",
|
||||
UploadIDHandler: nil,
|
||||
}, &baoProof)
|
||||
|
||||
if err != nil {
|
||||
s.logger.Error("error uploading object", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
err = saveAndPin(upload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func setAuthCookie(jwt string, jc jape.Context) {
|
||||
authCookie := http.Cookie{
|
||||
Name: "s5-auth-token",
|
||||
|
|
1
go.mod
1
go.mod
|
@ -10,6 +10,7 @@ require (
|
|||
github.com/aws/aws-sdk-go-v2/credentials v1.17.2
|
||||
github.com/aws/aws-sdk-go-v2/service/s3 v1.50.3
|
||||
github.com/casbin/casbin/v2 v2.82.0
|
||||
github.com/ddo/rq v0.0.0-20190828174524-b3daa55fcaba
|
||||
github.com/docker/go-units v0.5.0
|
||||
github.com/getkin/kin-openapi v0.118.0
|
||||
github.com/go-co-op/gocron/v2 v2.2.4
|
||||
|
|
6
go.sum
6
go.sum
|
@ -8,8 +8,8 @@ github.com/AfterShip/email-verifier v1.4.0/go.mod h1:JNPV1KZpTq4TArmss1NAOJsTD8J
|
|||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/LumeWeb/jape v0.0.0-20240204004049-ed792e7631cd h1:DHP7nn0Dg0I0WADOwBu2zR+p3mKgrU8h7HZ5OnyI/Q8=
|
||||
github.com/LumeWeb/jape v0.0.0-20240204004049-ed792e7631cd/go.mod h1:4QqmBB+t3W7cNplXPj++ZqpoUb2PeiS66RLpXmEGap4=
|
||||
github.com/LumeWeb/tusd/v2 v2.2.3-0.20240224142744-a6c7f0707b2a h1:WNNQ1HKFEcUmBwduEiESLVmUmN4wgYvnvBVPapDYIMs=
|
||||
github.com/LumeWeb/tusd/v2 v2.2.3-0.20240224142744-a6c7f0707b2a/go.mod h1:lqzUzWTG5OwezKgA1HJ+uwyjJusv6StTYdLTIvo0nxE=
|
||||
github.com/LumeWeb/tusd/v2 v2.2.3-0.20240224143554-96925dd43120 h1:+FYQ83a3c0p6Y/sqp3373CJ2m/b0mskdG3ma/opxtlk=
|
||||
github.com/LumeWeb/tusd/v2 v2.2.3-0.20240224143554-96925dd43120/go.mod h1:lqzUzWTG5OwezKgA1HJ+uwyjJusv6StTYdLTIvo0nxE=
|
||||
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
|
||||
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=
|
||||
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d/go.mod h1:asat636LX7Bqt5lYEZ27JNDcqxfjdBQuJ/MM4CN/Lzo=
|
||||
|
@ -90,6 +90,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1
|
|||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/dchest/threefish v0.0.0-20120919164726-3ecf4c494abf h1:K5VXW9LjmJv/xhjvQcNWTdk4WOSyreil6YaubuCPeRY=
|
||||
github.com/dchest/threefish v0.0.0-20120919164726-3ecf4c494abf/go.mod h1:bXVurdTuvOiJu7NHALemFe0JMvC2UmwYHW+7fcZaZ2M=
|
||||
github.com/ddo/rq v0.0.0-20190828174524-b3daa55fcaba h1:EMLQSxP68m4ddJpmMT+LizYO0AjrROprPXjll2CARj0=
|
||||
github.com/ddo/rq v0.0.0-20190828174524-b3daa55fcaba/go.mod h1:XIayI7kdKklkc7yyWDBYMJLbK/AO4AchQUxdoSFcn+k=
|
||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
|
||||
|
|
Loading…
Reference in New Issue