From 7696997e53d270c3e9712a1f36b9a9539bb64b2d Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Fri, 22 Mar 2024 17:39:17 -0400 Subject: [PATCH] refactor: implement import reader in s5 pinning --- api/s5/s5.go | 77 ++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 65 insertions(+), 12 deletions(-) diff --git a/api/s5/s5.go b/api/s5/s5.go index dc318ff..7cebed9 100644 --- a/api/s5/s5.go +++ b/api/s5/s5.go @@ -22,6 +22,8 @@ import ( "strconv" "strings" + _import "git.lumeweb.com/LumeWeb/portal/import" + "git.lumeweb.com/LumeWeb/portal/api/router" "git.lumeweb.com/LumeWeb/portal/bao" "git.lumeweb.com/LumeWeb/portal/renter" @@ -88,6 +90,7 @@ type S5API struct { logger *zap.Logger tusHandler *s5.TusHandler cron *cron.CronServiceDefault + _import _import.ImportService } type APIParams struct { @@ -102,6 +105,7 @@ type APIParams struct { Logger *zap.Logger TusHandler *s5.TusHandler Cron *cron.CronServiceDefault + Import _import.ImportService } type S5ApiResult struct { @@ -122,6 +126,7 @@ func NewS5(params APIParams) (S5ApiResult, error) { logger: params.Logger, tusHandler: params.TusHandler, cron: params.Cron, + _import: params.Import, } return S5ApiResult{ API: api, @@ -224,8 +229,9 @@ func (s *S5API) Routes() (*httprouter.Router, error) { "OPTIONS /s5/download/:cid": middleware.ApplyMiddlewares(corsOptionsHandler, middleware.ProxyMiddleware, defaultCors.Handler), // Pins API - "POST /s5/pin/:cid": middleware.ApplyMiddlewares(s.accountPin, middleware.ProxyMiddleware, authMw), - "DELETE /s5/delete/:cid": middleware.ApplyMiddlewares(s.accountPinDelete, middleware.ProxyMiddleware, authMw), + "POST /s5/pin/:cid": middleware.ApplyMiddlewares(s.accountPin, middleware.ProxyMiddleware, authMw), + "GET /s5/pin/:cid/status": middleware.ApplyMiddlewares(s.accountPinStatus, middleware.ProxyMiddleware, authMw), + "DELETE /s5/delete/:cid": middleware.ApplyMiddlewares(s.accountPinDelete, middleware.ProxyMiddleware, authMw), // Debug API "GET /s5/debug/download_urls/:cid": middleware.ApplyMiddlewares(s.debugDownloadUrls, middleware.ProxyMiddleware, debugCors.Handler), @@ -1010,7 +1016,7 @@ func (s *S5API) accountPinManifest(jc jape.Context, userId uint, cid *encoding.C error: nil, cid: cid, } - err := s.pinEntity(ctx, userId, cid) + err := s.pinEntity(ctx, userId, jc.Request.RemoteAddr, cid) if err != nil { s.logger.Error("Error pinning entity", zap.Error(err)) ret.success = false @@ -1103,7 +1109,7 @@ func (s *S5API) accountPin(jc jape.Context) { s.accountPinManifest(jc, userID, decodedCid, true) return } else { - err = s.pinEntity(jc.Request.Context(), userID, decodedCid) + err = s.pinEntity(jc.Request.Context(), userID, jc.Request.RemoteAddr, decodedCid) if err != nil { s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, err)) return @@ -1122,7 +1128,7 @@ func (s *S5API) accountPin(jc jape.Context) { s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, err)) return } - err := s.pinEntity(jc.Request.Context(), userID, cid) + err := s.pinEntity(jc.Request.Context(), userID, jc.Request.RemoteAddr, cid) if err != nil { s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, err)) return @@ -1134,7 +1140,11 @@ func (s *S5API) accountPin(jc jape.Context) { jc.ResponseWriter.WriteHeader(http.StatusNoContent) } -func (s *S5API) pinEntity(ctx context.Context, userId uint, cid *encoding.CID) error { +func (s *S5API) accountPinStatus(jc jape.Context) { + +} + +func (s *S5API) pinEntity(ctx context.Context, userId uint, userIp string, cid *encoding.CID) error { found := true if err := s.accounts.PinByHash(cid.Hash.HashBytes(), userId); err != nil { @@ -1253,6 +1263,17 @@ func (s *S5API) pinEntity(ctx context.Context, userId uint, cid *encoding.CID) e if err != nil { return nil } + + err = s._import.SaveImport(ctx, _import.ImportMetadata{ + UserID: userId, + Hash: cid.Hash.HashBytes(), + Protocol: s5.GetStorageProtocol(s.protocol).Name(), + ImporterIP: userIp, + }, true) + if err != nil { + return err + } + } return nil @@ -1970,6 +1991,7 @@ func (s *S5API) newFile(params FileParams) *S5File { func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId uint) error { ctx := context.Background() + totalStages := 3 // Parse CID early to avoid unnecessary operations if it fails. parsedCid, err := encoding.CIDFromString(cid) @@ -1978,6 +2000,18 @@ func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId return err } + __import, err := s._import.GetImport(ctx, parsedCid.Hash.HashBytes()) + if err != nil { + return err + } + + __import.Status = models.ImportStatusProcessing + + err = s._import.SaveImport(ctx, __import, true) + if err != nil { + return err + } + // Function to streamline error handling and closing of response body. closeBody := func(body io.ReadCloser) { if err := body.Close(); err != nil { @@ -1998,7 +2032,10 @@ func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId s.logger.Error("error executing request", zap.Error(err)) return nil, err } - defer closeBody(res.Body) + + importReader := _import.NewImportReader(s._import, __import, res.Body, parsedCid.Size, 1, totalStages) + + defer closeBody(importReader) if res.StatusCode != http.StatusOK { errMsg := "error fetching URL: " + fetchUrl @@ -2006,7 +2043,7 @@ func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId return nil, fmt.Errorf(errMsg+" with status: %s", res.Status) } - data, err := io.ReadAll(res.Body) + data, err := io.ReadAll(importReader) if err != nil { s.logger.Error("error reading response body", zap.Error(err)) return nil, err @@ -2020,7 +2057,12 @@ func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId return err } - if err := s.accounts.PinByHash(parsedCid.Hash.HashBytes(), userId); err != nil { + if err := s.accounts.PinByHash(upload.Hash, userId); err != nil { + return err + } + + err = s._import.DeleteImport(ctx, upload.Hash) + if err != nil { return err } @@ -2043,7 +2085,9 @@ func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId return fmt.Errorf("hash mismatch") } - upload, err := s.storage.UploadObject(ctx, s5.GetStorageProtocol(s.protocol), bytes.NewReader(fileData), nil, hash) + importReader := _import.NewImportReader(s._import, __import, bytes.NewReader(fileData), parsedCid.Size, 2, totalStages) + + upload, err := s.storage.UploadObject(ctx, s5.GetStorageProtocol(s.protocol), importReader, nil, hash) if err != nil { return err } @@ -2096,11 +2140,13 @@ func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId }(verifier) + importReader := _import.NewImportReader(s._import, __import, verifier, parsedCid.Size, 2, totalStages) + if parsedCid.Size < storage.S3_MULTIPART_MIN_PART_SIZE { _, err = client.PutObject(ctx, &s3.PutObjectInput{ Bucket: aws.String(s.config.Config().Core.Storage.S3.BufferBucket), Key: aws.String(cid), - Body: verifier, + Body: importReader, ContentLength: aws.Int64(int64(parsedCid.Size)), }) if err != nil { @@ -2108,13 +2154,15 @@ func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId return err } } else { - err := s.storage.S3MultipartUpload(ctx, verifier, s.config.Config().Core.Storage.S3.BufferBucket, cid, parsedCid.Size) + err := s.storage.S3MultipartUpload(ctx, importReader, s.config.Config().Core.Storage.S3.BufferBucket, cid, parsedCid.Size) if err != nil { s.logger.Error("error uploading object", zap.Error(err)) return err } } + importReader = _import.NewImportReader(s._import, __import, res.Body, parsedCid.Size, 3, totalStages) + upload, err := s.storage.UploadObject(ctx, s5.GetStorageProtocol(s.protocol), nil, &renter.MultiPartUploadParams{ ReaderFactory: func(start uint, end uint) (io.ReadCloser, error) { rangeHeader := "bytes=%d-" @@ -2134,6 +2182,11 @@ func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId return nil, err } + err = importReader.ReadBytes(int(end - start)) + if err != nil { + return nil, err + } + return object.Body, nil }, Bucket: s.config.Config().Core.Storage.S3.BufferBucket,