Compare commits
No commits in common. "7696997e53d270c3e9712a1f36b9a9539bb64b2d" and "d06f436fa1ecafb0fdd84299e12c4a0b99496b98" have entirely different histories.
7696997e53
...
d06f436fa1
73
api/s5/s5.go
73
api/s5/s5.go
|
@ -22,8 +22,6 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
_import "git.lumeweb.com/LumeWeb/portal/import"
|
|
||||||
|
|
||||||
"git.lumeweb.com/LumeWeb/portal/api/router"
|
"git.lumeweb.com/LumeWeb/portal/api/router"
|
||||||
"git.lumeweb.com/LumeWeb/portal/bao"
|
"git.lumeweb.com/LumeWeb/portal/bao"
|
||||||
"git.lumeweb.com/LumeWeb/portal/renter"
|
"git.lumeweb.com/LumeWeb/portal/renter"
|
||||||
|
@ -90,7 +88,6 @@ type S5API struct {
|
||||||
logger *zap.Logger
|
logger *zap.Logger
|
||||||
tusHandler *s5.TusHandler
|
tusHandler *s5.TusHandler
|
||||||
cron *cron.CronServiceDefault
|
cron *cron.CronServiceDefault
|
||||||
_import _import.ImportService
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type APIParams struct {
|
type APIParams struct {
|
||||||
|
@ -105,7 +102,6 @@ type APIParams struct {
|
||||||
Logger *zap.Logger
|
Logger *zap.Logger
|
||||||
TusHandler *s5.TusHandler
|
TusHandler *s5.TusHandler
|
||||||
Cron *cron.CronServiceDefault
|
Cron *cron.CronServiceDefault
|
||||||
Import _import.ImportService
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type S5ApiResult struct {
|
type S5ApiResult struct {
|
||||||
|
@ -126,7 +122,6 @@ func NewS5(params APIParams) (S5ApiResult, error) {
|
||||||
logger: params.Logger,
|
logger: params.Logger,
|
||||||
tusHandler: params.TusHandler,
|
tusHandler: params.TusHandler,
|
||||||
cron: params.Cron,
|
cron: params.Cron,
|
||||||
_import: params.Import,
|
|
||||||
}
|
}
|
||||||
return S5ApiResult{
|
return S5ApiResult{
|
||||||
API: api,
|
API: api,
|
||||||
|
@ -230,7 +225,6 @@ func (s *S5API) Routes() (*httprouter.Router, error) {
|
||||||
|
|
||||||
// Pins API
|
// Pins API
|
||||||
"POST /s5/pin/:cid": middleware.ApplyMiddlewares(s.accountPin, middleware.ProxyMiddleware, authMw),
|
"POST /s5/pin/:cid": middleware.ApplyMiddlewares(s.accountPin, middleware.ProxyMiddleware, authMw),
|
||||||
"GET /s5/pin/:cid/status": middleware.ApplyMiddlewares(s.accountPinStatus, middleware.ProxyMiddleware, authMw),
|
|
||||||
"DELETE /s5/delete/:cid": middleware.ApplyMiddlewares(s.accountPinDelete, middleware.ProxyMiddleware, authMw),
|
"DELETE /s5/delete/:cid": middleware.ApplyMiddlewares(s.accountPinDelete, middleware.ProxyMiddleware, authMw),
|
||||||
|
|
||||||
// Debug API
|
// Debug API
|
||||||
|
@ -1016,7 +1010,7 @@ func (s *S5API) accountPinManifest(jc jape.Context, userId uint, cid *encoding.C
|
||||||
error: nil,
|
error: nil,
|
||||||
cid: cid,
|
cid: cid,
|
||||||
}
|
}
|
||||||
err := s.pinEntity(ctx, userId, jc.Request.RemoteAddr, cid)
|
err := s.pinEntity(ctx, userId, cid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.logger.Error("Error pinning entity", zap.Error(err))
|
s.logger.Error("Error pinning entity", zap.Error(err))
|
||||||
ret.success = false
|
ret.success = false
|
||||||
|
@ -1109,7 +1103,7 @@ func (s *S5API) accountPin(jc jape.Context) {
|
||||||
s.accountPinManifest(jc, userID, decodedCid, true)
|
s.accountPinManifest(jc, userID, decodedCid, true)
|
||||||
return
|
return
|
||||||
} else {
|
} else {
|
||||||
err = s.pinEntity(jc.Request.Context(), userID, jc.Request.RemoteAddr, decodedCid)
|
err = s.pinEntity(jc.Request.Context(), userID, decodedCid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, err))
|
s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, err))
|
||||||
return
|
return
|
||||||
|
@ -1128,7 +1122,7 @@ func (s *S5API) accountPin(jc jape.Context) {
|
||||||
s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, err))
|
s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
err := s.pinEntity(jc.Request.Context(), userID, jc.Request.RemoteAddr, cid)
|
err := s.pinEntity(jc.Request.Context(), userID, cid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, err))
|
s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, err))
|
||||||
return
|
return
|
||||||
|
@ -1140,11 +1134,7 @@ func (s *S5API) accountPin(jc jape.Context) {
|
||||||
jc.ResponseWriter.WriteHeader(http.StatusNoContent)
|
jc.ResponseWriter.WriteHeader(http.StatusNoContent)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *S5API) accountPinStatus(jc jape.Context) {
|
func (s *S5API) pinEntity(ctx context.Context, userId uint, cid *encoding.CID) error {
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *S5API) pinEntity(ctx context.Context, userId uint, userIp string, cid *encoding.CID) error {
|
|
||||||
found := true
|
found := true
|
||||||
|
|
||||||
if err := s.accounts.PinByHash(cid.Hash.HashBytes(), userId); err != nil {
|
if err := s.accounts.PinByHash(cid.Hash.HashBytes(), userId); err != nil {
|
||||||
|
@ -1263,17 +1253,6 @@ func (s *S5API) pinEntity(ctx context.Context, userId uint, userIp string, cid *
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
err = s._import.SaveImport(ctx, _import.ImportMetadata{
|
|
||||||
UserID: userId,
|
|
||||||
Hash: cid.Hash.HashBytes(),
|
|
||||||
Protocol: s5.GetStorageProtocol(s.protocol).Name(),
|
|
||||||
ImporterIP: userIp,
|
|
||||||
}, true)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -1991,7 +1970,6 @@ func (s *S5API) newFile(params FileParams) *S5File {
|
||||||
|
|
||||||
func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId uint) error {
|
func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId uint) error {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
totalStages := 3
|
|
||||||
|
|
||||||
// Parse CID early to avoid unnecessary operations if it fails.
|
// Parse CID early to avoid unnecessary operations if it fails.
|
||||||
parsedCid, err := encoding.CIDFromString(cid)
|
parsedCid, err := encoding.CIDFromString(cid)
|
||||||
|
@ -2000,18 +1978,6 @@ func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
__import, err := s._import.GetImport(ctx, parsedCid.Hash.HashBytes())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
__import.Status = models.ImportStatusProcessing
|
|
||||||
|
|
||||||
err = s._import.SaveImport(ctx, __import, true)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Function to streamline error handling and closing of response body.
|
// Function to streamline error handling and closing of response body.
|
||||||
closeBody := func(body io.ReadCloser) {
|
closeBody := func(body io.ReadCloser) {
|
||||||
if err := body.Close(); err != nil {
|
if err := body.Close(); err != nil {
|
||||||
|
@ -2032,10 +1998,7 @@ func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId
|
||||||
s.logger.Error("error executing request", zap.Error(err))
|
s.logger.Error("error executing request", zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
defer closeBody(res.Body)
|
||||||
importReader := _import.NewImportReader(s._import, __import, res.Body, parsedCid.Size, 1, totalStages)
|
|
||||||
|
|
||||||
defer closeBody(importReader)
|
|
||||||
|
|
||||||
if res.StatusCode != http.StatusOK {
|
if res.StatusCode != http.StatusOK {
|
||||||
errMsg := "error fetching URL: " + fetchUrl
|
errMsg := "error fetching URL: " + fetchUrl
|
||||||
|
@ -2043,7 +2006,7 @@ func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId
|
||||||
return nil, fmt.Errorf(errMsg+" with status: %s", res.Status)
|
return nil, fmt.Errorf(errMsg+" with status: %s", res.Status)
|
||||||
}
|
}
|
||||||
|
|
||||||
data, err := io.ReadAll(importReader)
|
data, err := io.ReadAll(res.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.logger.Error("error reading response body", zap.Error(err))
|
s.logger.Error("error reading response body", zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -2057,12 +2020,7 @@ func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := s.accounts.PinByHash(upload.Hash, userId); err != nil {
|
if err := s.accounts.PinByHash(parsedCid.Hash.HashBytes(), userId); err != nil {
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = s._import.DeleteImport(ctx, upload.Hash)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2085,9 +2043,7 @@ func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId
|
||||||
return fmt.Errorf("hash mismatch")
|
return fmt.Errorf("hash mismatch")
|
||||||
}
|
}
|
||||||
|
|
||||||
importReader := _import.NewImportReader(s._import, __import, bytes.NewReader(fileData), parsedCid.Size, 2, totalStages)
|
upload, err := s.storage.UploadObject(ctx, s5.GetStorageProtocol(s.protocol), bytes.NewReader(fileData), nil, hash)
|
||||||
|
|
||||||
upload, err := s.storage.UploadObject(ctx, s5.GetStorageProtocol(s.protocol), importReader, nil, hash)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -2140,13 +2096,11 @@ func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId
|
||||||
|
|
||||||
}(verifier)
|
}(verifier)
|
||||||
|
|
||||||
importReader := _import.NewImportReader(s._import, __import, verifier, parsedCid.Size, 2, totalStages)
|
|
||||||
|
|
||||||
if parsedCid.Size < storage.S3_MULTIPART_MIN_PART_SIZE {
|
if parsedCid.Size < storage.S3_MULTIPART_MIN_PART_SIZE {
|
||||||
_, err = client.PutObject(ctx, &s3.PutObjectInput{
|
_, err = client.PutObject(ctx, &s3.PutObjectInput{
|
||||||
Bucket: aws.String(s.config.Config().Core.Storage.S3.BufferBucket),
|
Bucket: aws.String(s.config.Config().Core.Storage.S3.BufferBucket),
|
||||||
Key: aws.String(cid),
|
Key: aws.String(cid),
|
||||||
Body: importReader,
|
Body: verifier,
|
||||||
ContentLength: aws.Int64(int64(parsedCid.Size)),
|
ContentLength: aws.Int64(int64(parsedCid.Size)),
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -2154,15 +2108,13 @@ func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
err := s.storage.S3MultipartUpload(ctx, importReader, s.config.Config().Core.Storage.S3.BufferBucket, cid, parsedCid.Size)
|
err := s.storage.S3MultipartUpload(ctx, verifier, s.config.Config().Core.Storage.S3.BufferBucket, cid, parsedCid.Size)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.logger.Error("error uploading object", zap.Error(err))
|
s.logger.Error("error uploading object", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
importReader = _import.NewImportReader(s._import, __import, res.Body, parsedCid.Size, 3, totalStages)
|
|
||||||
|
|
||||||
upload, err := s.storage.UploadObject(ctx, s5.GetStorageProtocol(s.protocol), nil, &renter.MultiPartUploadParams{
|
upload, err := s.storage.UploadObject(ctx, s5.GetStorageProtocol(s.protocol), nil, &renter.MultiPartUploadParams{
|
||||||
ReaderFactory: func(start uint, end uint) (io.ReadCloser, error) {
|
ReaderFactory: func(start uint, end uint) (io.ReadCloser, error) {
|
||||||
rangeHeader := "bytes=%d-"
|
rangeHeader := "bytes=%d-"
|
||||||
|
@ -2182,11 +2134,6 @@ func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = importReader.ReadBytes(int(end - start))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return object.Body, nil
|
return object.Body, nil
|
||||||
},
|
},
|
||||||
Bucket: s.config.Config().Core.Storage.S3.BufferBucket,
|
Bucket: s.config.Config().Core.Storage.S3.BufferBucket,
|
||||||
|
|
|
@ -4,8 +4,6 @@ import (
|
||||||
"flag"
|
"flag"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
_import "git.lumeweb.com/LumeWeb/portal/import"
|
|
||||||
|
|
||||||
"git.lumeweb.com/LumeWeb/portal/mailer"
|
"git.lumeweb.com/LumeWeb/portal/mailer"
|
||||||
|
|
||||||
"git.lumeweb.com/LumeWeb/portal/config"
|
"git.lumeweb.com/LumeWeb/portal/config"
|
||||||
|
@ -63,7 +61,6 @@ func main() {
|
||||||
cron.Module,
|
cron.Module,
|
||||||
account.Module,
|
account.Module,
|
||||||
metadata.Module,
|
metadata.Module,
|
||||||
_import.Module,
|
|
||||||
mailer.Module,
|
mailer.Module,
|
||||||
protocols.BuildProtocols(cfg),
|
protocols.BuildProtocols(cfg),
|
||||||
api.BuildApis(cfg),
|
api.BuildApis(cfg),
|
||||||
|
|
|
@ -1,25 +0,0 @@
|
||||||
package models
|
|
||||||
|
|
||||||
import "gorm.io/gorm"
|
|
||||||
|
|
||||||
type ImportStatus string
|
|
||||||
|
|
||||||
const (
|
|
||||||
ImportStatusQueued ImportStatus = "queued"
|
|
||||||
ImportStatusProcessing ImportStatus = "processing"
|
|
||||||
)
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
registerModel(&Upload{})
|
|
||||||
}
|
|
||||||
|
|
||||||
type Import struct {
|
|
||||||
gorm.Model
|
|
||||||
UserID uint
|
|
||||||
Hash []byte `gorm:"type:binary(32);"`
|
|
||||||
Protocol string
|
|
||||||
User User
|
|
||||||
ImporterIP string
|
|
||||||
Status ImportStatus
|
|
||||||
Progress float64
|
|
||||||
}
|
|
269
import/import.go
269
import/import.go
|
@ -1,269 +0,0 @@
|
||||||
package _import
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"io"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.lumeweb.com/LumeWeb/portal/db/models"
|
|
||||||
|
|
||||||
"go.uber.org/fx"
|
|
||||||
"gorm.io/gorm"
|
|
||||||
)
|
|
||||||
|
|
||||||
var ErrNotFound = gorm.ErrRecordNotFound
|
|
||||||
|
|
||||||
var _ ImportService = (*ImportServiceDefault)(nil)
|
|
||||||
var _ io.ReadSeekCloser = (*ImportReader)(nil)
|
|
||||||
|
|
||||||
type ImportMetadata struct {
|
|
||||||
ID uint
|
|
||||||
UserID uint
|
|
||||||
Hash []byte
|
|
||||||
Status models.ImportStatus
|
|
||||||
Progress float64
|
|
||||||
Protocol string
|
|
||||||
ImporterIP string
|
|
||||||
Created time.Time
|
|
||||||
}
|
|
||||||
|
|
||||||
type ImportService interface {
|
|
||||||
SaveImport(ctx context.Context, metadata ImportMetadata, skipExisting bool) error
|
|
||||||
GetImport(ctx context.Context, objectHash []byte) (ImportMetadata, error)
|
|
||||||
DeleteImport(ctx context.Context, objectHash []byte) error
|
|
||||||
}
|
|
||||||
|
|
||||||
func (u ImportMetadata) IsEmpty() bool {
|
|
||||||
if u.UserID != 0 || u.Protocol != "" || u.ImporterIP != "" || u.Status != "" {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
if !u.Created.IsZero() {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(u.Hash) != 0 {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
var Module = fx.Module("import",
|
|
||||||
fx.Provide(
|
|
||||||
fx.Annotate(
|
|
||||||
NewImportService,
|
|
||||||
fx.As(new(ImportService)),
|
|
||||||
),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
type ImportServiceDefault struct {
|
|
||||||
db *gorm.DB
|
|
||||||
}
|
|
||||||
|
|
||||||
func (i ImportServiceDefault) SaveImport(ctx context.Context, metadata ImportMetadata, skipExisting bool) error {
|
|
||||||
var __import models.Import
|
|
||||||
|
|
||||||
__import.Hash = metadata.Hash
|
|
||||||
|
|
||||||
ret := i.db.WithContext(ctx).Model(&models.Import{}).Where(&__import).First(&__import)
|
|
||||||
|
|
||||||
if ret.Error != nil {
|
|
||||||
if errors.Is(ret.Error, gorm.ErrRecordNotFound) {
|
|
||||||
return i.createImport(ctx, metadata)
|
|
||||||
}
|
|
||||||
return ret.Error
|
|
||||||
}
|
|
||||||
|
|
||||||
if skipExisting {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
changed := false
|
|
||||||
|
|
||||||
if __import.UserID != metadata.UserID {
|
|
||||||
changed = true
|
|
||||||
}
|
|
||||||
|
|
||||||
if __import.Status != metadata.Status {
|
|
||||||
changed = true
|
|
||||||
}
|
|
||||||
|
|
||||||
if __import.Progress != metadata.Progress {
|
|
||||||
changed = true
|
|
||||||
}
|
|
||||||
|
|
||||||
if __import.Protocol != metadata.Protocol {
|
|
||||||
changed = true
|
|
||||||
}
|
|
||||||
|
|
||||||
if __import.ImporterIP != metadata.ImporterIP {
|
|
||||||
changed = true
|
|
||||||
}
|
|
||||||
if changed {
|
|
||||||
return i.db.Updates(&__import).Error
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *ImportServiceDefault) createImport(ctx context.Context, metadata ImportMetadata) error {
|
|
||||||
__import := models.Import{
|
|
||||||
UserID: metadata.UserID,
|
|
||||||
Hash: metadata.Hash,
|
|
||||||
Status: metadata.Status,
|
|
||||||
Progress: metadata.Progress,
|
|
||||||
Protocol: metadata.Protocol,
|
|
||||||
ImporterIP: metadata.ImporterIP,
|
|
||||||
}
|
|
||||||
|
|
||||||
if __import.Status == "" {
|
|
||||||
__import.Status = models.ImportStatusQueued
|
|
||||||
}
|
|
||||||
|
|
||||||
return m.db.WithContext(ctx).Create(&__import).Error
|
|
||||||
}
|
|
||||||
|
|
||||||
func (i ImportServiceDefault) GetImport(ctx context.Context, objectHash []byte) (ImportMetadata, error) {
|
|
||||||
var _import models.Import
|
|
||||||
|
|
||||||
_import.Hash = objectHash
|
|
||||||
|
|
||||||
ret := i.db.WithContext(ctx).Model(&models.Import{}).Where(&_import).First(&_import)
|
|
||||||
|
|
||||||
if ret.Error != nil {
|
|
||||||
if errors.Is(ret.Error, gorm.ErrRecordNotFound) {
|
|
||||||
return ImportMetadata{}, ErrNotFound
|
|
||||||
}
|
|
||||||
return ImportMetadata{}, ret.Error
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
return ImportMetadata{
|
|
||||||
ID: _import.ID,
|
|
||||||
UserID: _import.UserID,
|
|
||||||
Hash: _import.Hash,
|
|
||||||
Protocol: _import.Protocol,
|
|
||||||
Status: _import.Status,
|
|
||||||
Progress: _import.Progress,
|
|
||||||
ImporterIP: _import.ImporterIP,
|
|
||||||
Created: _import.CreatedAt,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (i ImportServiceDefault) DeleteImport(ctx context.Context, objectHash []byte) error {
|
|
||||||
var _import models.Import
|
|
||||||
|
|
||||||
_import.Hash = objectHash
|
|
||||||
|
|
||||||
ret := i.db.WithContext(ctx).Model(&models.Import{}).Where(&_import).Delete(&_import)
|
|
||||||
|
|
||||||
if ret.Error != nil {
|
|
||||||
if errors.Is(ret.Error, gorm.ErrRecordNotFound) {
|
|
||||||
return ErrNotFound
|
|
||||||
}
|
|
||||||
return ret.Error
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type ImportServiceParams struct {
|
|
||||||
fx.In
|
|
||||||
Db *gorm.DB
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewImportService(params ImportServiceParams) *ImportServiceDefault {
|
|
||||||
return &ImportServiceDefault{
|
|
||||||
db: params.Db,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type ImportReader struct {
|
|
||||||
service ImportService
|
|
||||||
meta ImportMetadata
|
|
||||||
reader io.Reader
|
|
||||||
size uint64
|
|
||||||
stage int
|
|
||||||
totalStages int
|
|
||||||
bytesRead uint64
|
|
||||||
}
|
|
||||||
|
|
||||||
func (i *ImportReader) Seek(offset int64, whence int) (int64, error) {
|
|
||||||
if seeker, ok := i.reader.(io.Seeker); ok {
|
|
||||||
// If seeking to the start, reset progress based on recorded bytes
|
|
||||||
if whence == io.SeekStart && offset == 0 {
|
|
||||||
i.bytesRead = 0
|
|
||||||
i.meta.Progress = 0
|
|
||||||
if err := i.service.SaveImport(context.Background(), i.meta, false); err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return seeker.Seek(offset, whence)
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0, errors.New("Seek not supported")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (i *ImportReader) Close() error {
|
|
||||||
if closer, ok := i.reader.(io.Closer); ok {
|
|
||||||
return closer.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (i *ImportReader) Read(p []byte) (n int, err error) {
|
|
||||||
n, err = i.reader.Read(p)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update cumulative bytes read
|
|
||||||
i.bytesRead += uint64(n)
|
|
||||||
|
|
||||||
err = i.ReadBytes(n)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return n, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (i *ImportReader) ReadBytes(n int) (err error) {
|
|
||||||
stageProgress := float64(100) / float64(i.totalStages)
|
|
||||||
|
|
||||||
// Calculate progress based on bytes read
|
|
||||||
i.meta.Progress = float64(i.bytesRead) / float64(i.size) * 100.0
|
|
||||||
|
|
||||||
// Adjust progress for current stage
|
|
||||||
if i.stage > 1 {
|
|
||||||
i.meta.Progress += float64(i.stage-1) * stageProgress
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ensure progress doesn't exceed 100%
|
|
||||||
if i.meta.Progress > 100 {
|
|
||||||
i.meta.Progress = 100
|
|
||||||
}
|
|
||||||
|
|
||||||
// Save import progress
|
|
||||||
err = i.service.SaveImport(context.Background(), i.meta, false)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewImportReader(service ImportService, meta ImportMetadata, reader io.Reader, size uint64, stage, totalStages int) *ImportReader {
|
|
||||||
return &ImportReader{
|
|
||||||
service: service,
|
|
||||||
meta: meta,
|
|
||||||
reader: reader,
|
|
||||||
size: size,
|
|
||||||
stage: stage,
|
|
||||||
totalStages: totalStages,
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -13,8 +13,6 @@ import (
|
||||||
|
|
||||||
var ErrNotFound = gorm.ErrRecordNotFound
|
var ErrNotFound = gorm.ErrRecordNotFound
|
||||||
|
|
||||||
var _ MetadataService = (*MetadataServiceDefault)(nil)
|
|
||||||
|
|
||||||
type UploadMetadata struct {
|
type UploadMetadata struct {
|
||||||
ID uint `json:"upload_id"`
|
ID uint `json:"upload_id"`
|
||||||
UserID uint `json:"user_id"`
|
UserID uint `json:"user_id"`
|
||||||
|
|
Loading…
Reference in New Issue