refactor: implement import reader in s5 pinning

This commit is contained in:
Derrick Hammer 2024-03-22 17:39:17 -04:00
parent 5523d5e60d
commit 7696997e53
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
1 changed files with 65 additions and 12 deletions

View File

@ -22,6 +22,8 @@ import (
"strconv" "strconv"
"strings" "strings"
_import "git.lumeweb.com/LumeWeb/portal/import"
"git.lumeweb.com/LumeWeb/portal/api/router" "git.lumeweb.com/LumeWeb/portal/api/router"
"git.lumeweb.com/LumeWeb/portal/bao" "git.lumeweb.com/LumeWeb/portal/bao"
"git.lumeweb.com/LumeWeb/portal/renter" "git.lumeweb.com/LumeWeb/portal/renter"
@ -88,6 +90,7 @@ type S5API struct {
logger *zap.Logger logger *zap.Logger
tusHandler *s5.TusHandler tusHandler *s5.TusHandler
cron *cron.CronServiceDefault cron *cron.CronServiceDefault
_import _import.ImportService
} }
type APIParams struct { type APIParams struct {
@ -102,6 +105,7 @@ type APIParams struct {
Logger *zap.Logger Logger *zap.Logger
TusHandler *s5.TusHandler TusHandler *s5.TusHandler
Cron *cron.CronServiceDefault Cron *cron.CronServiceDefault
Import _import.ImportService
} }
type S5ApiResult struct { type S5ApiResult struct {
@ -122,6 +126,7 @@ func NewS5(params APIParams) (S5ApiResult, error) {
logger: params.Logger, logger: params.Logger,
tusHandler: params.TusHandler, tusHandler: params.TusHandler,
cron: params.Cron, cron: params.Cron,
_import: params.Import,
} }
return S5ApiResult{ return S5ApiResult{
API: api, API: api,
@ -225,6 +230,7 @@ func (s *S5API) Routes() (*httprouter.Router, error) {
// Pins API // Pins API
"POST /s5/pin/:cid": middleware.ApplyMiddlewares(s.accountPin, middleware.ProxyMiddleware, authMw), "POST /s5/pin/:cid": middleware.ApplyMiddlewares(s.accountPin, middleware.ProxyMiddleware, authMw),
"GET /s5/pin/:cid/status": middleware.ApplyMiddlewares(s.accountPinStatus, middleware.ProxyMiddleware, authMw),
"DELETE /s5/delete/:cid": middleware.ApplyMiddlewares(s.accountPinDelete, middleware.ProxyMiddleware, authMw), "DELETE /s5/delete/:cid": middleware.ApplyMiddlewares(s.accountPinDelete, middleware.ProxyMiddleware, authMw),
// Debug API // Debug API
@ -1010,7 +1016,7 @@ func (s *S5API) accountPinManifest(jc jape.Context, userId uint, cid *encoding.C
error: nil, error: nil,
cid: cid, cid: cid,
} }
err := s.pinEntity(ctx, userId, cid) err := s.pinEntity(ctx, userId, jc.Request.RemoteAddr, cid)
if err != nil { if err != nil {
s.logger.Error("Error pinning entity", zap.Error(err)) s.logger.Error("Error pinning entity", zap.Error(err))
ret.success = false ret.success = false
@ -1103,7 +1109,7 @@ func (s *S5API) accountPin(jc jape.Context) {
s.accountPinManifest(jc, userID, decodedCid, true) s.accountPinManifest(jc, userID, decodedCid, true)
return return
} else { } else {
err = s.pinEntity(jc.Request.Context(), userID, decodedCid) err = s.pinEntity(jc.Request.Context(), userID, jc.Request.RemoteAddr, decodedCid)
if err != nil { if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, err)) s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, err))
return return
@ -1122,7 +1128,7 @@ func (s *S5API) accountPin(jc jape.Context) {
s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, err)) s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, err))
return return
} }
err := s.pinEntity(jc.Request.Context(), userID, cid) err := s.pinEntity(jc.Request.Context(), userID, jc.Request.RemoteAddr, cid)
if err != nil { if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, err)) s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, err))
return return
@ -1134,7 +1140,11 @@ func (s *S5API) accountPin(jc jape.Context) {
jc.ResponseWriter.WriteHeader(http.StatusNoContent) jc.ResponseWriter.WriteHeader(http.StatusNoContent)
} }
func (s *S5API) pinEntity(ctx context.Context, userId uint, cid *encoding.CID) error { func (s *S5API) accountPinStatus(jc jape.Context) {
}
func (s *S5API) pinEntity(ctx context.Context, userId uint, userIp string, cid *encoding.CID) error {
found := true found := true
if err := s.accounts.PinByHash(cid.Hash.HashBytes(), userId); err != nil { if err := s.accounts.PinByHash(cid.Hash.HashBytes(), userId); err != nil {
@ -1253,6 +1263,17 @@ func (s *S5API) pinEntity(ctx context.Context, userId uint, cid *encoding.CID) e
if err != nil { if err != nil {
return nil return nil
} }
err = s._import.SaveImport(ctx, _import.ImportMetadata{
UserID: userId,
Hash: cid.Hash.HashBytes(),
Protocol: s5.GetStorageProtocol(s.protocol).Name(),
ImporterIP: userIp,
}, true)
if err != nil {
return err
}
} }
return nil return nil
@ -1970,6 +1991,7 @@ func (s *S5API) newFile(params FileParams) *S5File {
func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId uint) error { func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId uint) error {
ctx := context.Background() ctx := context.Background()
totalStages := 3
// Parse CID early to avoid unnecessary operations if it fails. // Parse CID early to avoid unnecessary operations if it fails.
parsedCid, err := encoding.CIDFromString(cid) parsedCid, err := encoding.CIDFromString(cid)
@ -1978,6 +2000,18 @@ func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId
return err return err
} }
__import, err := s._import.GetImport(ctx, parsedCid.Hash.HashBytes())
if err != nil {
return err
}
__import.Status = models.ImportStatusProcessing
err = s._import.SaveImport(ctx, __import, true)
if err != nil {
return err
}
// Function to streamline error handling and closing of response body. // Function to streamline error handling and closing of response body.
closeBody := func(body io.ReadCloser) { closeBody := func(body io.ReadCloser) {
if err := body.Close(); err != nil { if err := body.Close(); err != nil {
@ -1998,7 +2032,10 @@ func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId
s.logger.Error("error executing request", zap.Error(err)) s.logger.Error("error executing request", zap.Error(err))
return nil, err return nil, err
} }
defer closeBody(res.Body)
importReader := _import.NewImportReader(s._import, __import, res.Body, parsedCid.Size, 1, totalStages)
defer closeBody(importReader)
if res.StatusCode != http.StatusOK { if res.StatusCode != http.StatusOK {
errMsg := "error fetching URL: " + fetchUrl errMsg := "error fetching URL: " + fetchUrl
@ -2006,7 +2043,7 @@ func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId
return nil, fmt.Errorf(errMsg+" with status: %s", res.Status) return nil, fmt.Errorf(errMsg+" with status: %s", res.Status)
} }
data, err := io.ReadAll(res.Body) data, err := io.ReadAll(importReader)
if err != nil { if err != nil {
s.logger.Error("error reading response body", zap.Error(err)) s.logger.Error("error reading response body", zap.Error(err))
return nil, err return nil, err
@ -2020,7 +2057,12 @@ func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId
return err return err
} }
if err := s.accounts.PinByHash(parsedCid.Hash.HashBytes(), userId); err != nil { if err := s.accounts.PinByHash(upload.Hash, userId); err != nil {
return err
}
err = s._import.DeleteImport(ctx, upload.Hash)
if err != nil {
return err return err
} }
@ -2043,7 +2085,9 @@ func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId
return fmt.Errorf("hash mismatch") return fmt.Errorf("hash mismatch")
} }
upload, err := s.storage.UploadObject(ctx, s5.GetStorageProtocol(s.protocol), bytes.NewReader(fileData), nil, hash) importReader := _import.NewImportReader(s._import, __import, bytes.NewReader(fileData), parsedCid.Size, 2, totalStages)
upload, err := s.storage.UploadObject(ctx, s5.GetStorageProtocol(s.protocol), importReader, nil, hash)
if err != nil { if err != nil {
return err return err
} }
@ -2096,11 +2140,13 @@ func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId
}(verifier) }(verifier)
importReader := _import.NewImportReader(s._import, __import, verifier, parsedCid.Size, 2, totalStages)
if parsedCid.Size < storage.S3_MULTIPART_MIN_PART_SIZE { if parsedCid.Size < storage.S3_MULTIPART_MIN_PART_SIZE {
_, err = client.PutObject(ctx, &s3.PutObjectInput{ _, err = client.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(s.config.Config().Core.Storage.S3.BufferBucket), Bucket: aws.String(s.config.Config().Core.Storage.S3.BufferBucket),
Key: aws.String(cid), Key: aws.String(cid),
Body: verifier, Body: importReader,
ContentLength: aws.Int64(int64(parsedCid.Size)), ContentLength: aws.Int64(int64(parsedCid.Size)),
}) })
if err != nil { if err != nil {
@ -2108,13 +2154,15 @@ func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId
return err return err
} }
} else { } else {
err := s.storage.S3MultipartUpload(ctx, verifier, s.config.Config().Core.Storage.S3.BufferBucket, cid, parsedCid.Size) err := s.storage.S3MultipartUpload(ctx, importReader, s.config.Config().Core.Storage.S3.BufferBucket, cid, parsedCid.Size)
if err != nil { if err != nil {
s.logger.Error("error uploading object", zap.Error(err)) s.logger.Error("error uploading object", zap.Error(err))
return err return err
} }
} }
importReader = _import.NewImportReader(s._import, __import, res.Body, parsedCid.Size, 3, totalStages)
upload, err := s.storage.UploadObject(ctx, s5.GetStorageProtocol(s.protocol), nil, &renter.MultiPartUploadParams{ upload, err := s.storage.UploadObject(ctx, s5.GetStorageProtocol(s.protocol), nil, &renter.MultiPartUploadParams{
ReaderFactory: func(start uint, end uint) (io.ReadCloser, error) { ReaderFactory: func(start uint, end uint) (io.ReadCloser, error) {
rangeHeader := "bytes=%d-" rangeHeader := "bytes=%d-"
@ -2134,6 +2182,11 @@ func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId
return nil, err return nil, err
} }
err = importReader.ReadBytes(int(end - start))
if err != nil {
return nil, err
}
return object.Body, nil return object.Body, nil
}, },
Bucket: s.config.Config().Core.Storage.S3.BufferBucket, Bucket: s.config.Config().Core.Storage.S3.BufferBucket,