Compare commits

...

4 Commits

5 changed files with 364 additions and 12 deletions

View File

@ -22,6 +22,8 @@ import (
"strconv"
"strings"
_import "git.lumeweb.com/LumeWeb/portal/import"
"git.lumeweb.com/LumeWeb/portal/api/router"
"git.lumeweb.com/LumeWeb/portal/bao"
"git.lumeweb.com/LumeWeb/portal/renter"
@ -88,6 +90,7 @@ type S5API struct {
logger *zap.Logger
tusHandler *s5.TusHandler
cron *cron.CronServiceDefault
_import _import.ImportService
}
type APIParams struct {
@ -102,6 +105,7 @@ type APIParams struct {
Logger *zap.Logger
TusHandler *s5.TusHandler
Cron *cron.CronServiceDefault
Import _import.ImportService
}
type S5ApiResult struct {
@ -122,6 +126,7 @@ func NewS5(params APIParams) (S5ApiResult, error) {
logger: params.Logger,
tusHandler: params.TusHandler,
cron: params.Cron,
_import: params.Import,
}
return S5ApiResult{
API: api,
@ -224,8 +229,9 @@ func (s *S5API) Routes() (*httprouter.Router, error) {
"OPTIONS /s5/download/:cid": middleware.ApplyMiddlewares(corsOptionsHandler, middleware.ProxyMiddleware, defaultCors.Handler),
// Pins API
"POST /s5/pin/:cid": middleware.ApplyMiddlewares(s.accountPin, middleware.ProxyMiddleware, authMw),
"DELETE /s5/delete/:cid": middleware.ApplyMiddlewares(s.accountPinDelete, middleware.ProxyMiddleware, authMw),
"POST /s5/pin/:cid": middleware.ApplyMiddlewares(s.accountPin, middleware.ProxyMiddleware, authMw),
"GET /s5/pin/:cid/status": middleware.ApplyMiddlewares(s.accountPinStatus, middleware.ProxyMiddleware, authMw),
"DELETE /s5/delete/:cid": middleware.ApplyMiddlewares(s.accountPinDelete, middleware.ProxyMiddleware, authMw),
// Debug API
"GET /s5/debug/download_urls/:cid": middleware.ApplyMiddlewares(s.debugDownloadUrls, middleware.ProxyMiddleware, debugCors.Handler),
@ -1010,7 +1016,7 @@ func (s *S5API) accountPinManifest(jc jape.Context, userId uint, cid *encoding.C
error: nil,
cid: cid,
}
err := s.pinEntity(ctx, userId, cid)
err := s.pinEntity(ctx, userId, jc.Request.RemoteAddr, cid)
if err != nil {
s.logger.Error("Error pinning entity", zap.Error(err))
ret.success = false
@ -1103,7 +1109,7 @@ func (s *S5API) accountPin(jc jape.Context) {
s.accountPinManifest(jc, userID, decodedCid, true)
return
} else {
err = s.pinEntity(jc.Request.Context(), userID, decodedCid)
err = s.pinEntity(jc.Request.Context(), userID, jc.Request.RemoteAddr, decodedCid)
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, err))
return
@ -1122,7 +1128,7 @@ func (s *S5API) accountPin(jc jape.Context) {
s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, err))
return
}
err := s.pinEntity(jc.Request.Context(), userID, cid)
err := s.pinEntity(jc.Request.Context(), userID, jc.Request.RemoteAddr, cid)
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, err))
return
@ -1134,7 +1140,11 @@ func (s *S5API) accountPin(jc jape.Context) {
jc.ResponseWriter.WriteHeader(http.StatusNoContent)
}
func (s *S5API) pinEntity(ctx context.Context, userId uint, cid *encoding.CID) error {
func (s *S5API) accountPinStatus(jc jape.Context) {
}
func (s *S5API) pinEntity(ctx context.Context, userId uint, userIp string, cid *encoding.CID) error {
found := true
if err := s.accounts.PinByHash(cid.Hash.HashBytes(), userId); err != nil {
@ -1253,6 +1263,17 @@ func (s *S5API) pinEntity(ctx context.Context, userId uint, cid *encoding.CID) e
if err != nil {
return nil
}
err = s._import.SaveImport(ctx, _import.ImportMetadata{
UserID: userId,
Hash: cid.Hash.HashBytes(),
Protocol: s5.GetStorageProtocol(s.protocol).Name(),
ImporterIP: userIp,
}, true)
if err != nil {
return err
}
}
return nil
@ -1970,6 +1991,7 @@ func (s *S5API) newFile(params FileParams) *S5File {
func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId uint) error {
ctx := context.Background()
totalStages := 3
// Parse CID early to avoid unnecessary operations if it fails.
parsedCid, err := encoding.CIDFromString(cid)
@ -1978,6 +2000,18 @@ func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId
return err
}
__import, err := s._import.GetImport(ctx, parsedCid.Hash.HashBytes())
if err != nil {
return err
}
__import.Status = models.ImportStatusProcessing
err = s._import.SaveImport(ctx, __import, true)
if err != nil {
return err
}
// Function to streamline error handling and closing of response body.
closeBody := func(body io.ReadCloser) {
if err := body.Close(); err != nil {
@ -1998,7 +2032,10 @@ func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId
s.logger.Error("error executing request", zap.Error(err))
return nil, err
}
defer closeBody(res.Body)
importReader := _import.NewImportReader(s._import, __import, res.Body, parsedCid.Size, 1, totalStages)
defer closeBody(importReader)
if res.StatusCode != http.StatusOK {
errMsg := "error fetching URL: " + fetchUrl
@ -2006,7 +2043,7 @@ func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId
return nil, fmt.Errorf(errMsg+" with status: %s", res.Status)
}
data, err := io.ReadAll(res.Body)
data, err := io.ReadAll(importReader)
if err != nil {
s.logger.Error("error reading response body", zap.Error(err))
return nil, err
@ -2020,7 +2057,12 @@ func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId
return err
}
if err := s.accounts.PinByHash(parsedCid.Hash.HashBytes(), userId); err != nil {
if err := s.accounts.PinByHash(upload.Hash, userId); err != nil {
return err
}
err = s._import.DeleteImport(ctx, upload.Hash)
if err != nil {
return err
}
@ -2043,7 +2085,9 @@ func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId
return fmt.Errorf("hash mismatch")
}
upload, err := s.storage.UploadObject(ctx, s5.GetStorageProtocol(s.protocol), bytes.NewReader(fileData), nil, hash)
importReader := _import.NewImportReader(s._import, __import, bytes.NewReader(fileData), parsedCid.Size, 2, totalStages)
upload, err := s.storage.UploadObject(ctx, s5.GetStorageProtocol(s.protocol), importReader, nil, hash)
if err != nil {
return err
}
@ -2096,11 +2140,13 @@ func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId
}(verifier)
importReader := _import.NewImportReader(s._import, __import, verifier, parsedCid.Size, 2, totalStages)
if parsedCid.Size < storage.S3_MULTIPART_MIN_PART_SIZE {
_, err = client.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(s.config.Config().Core.Storage.S3.BufferBucket),
Key: aws.String(cid),
Body: verifier,
Body: importReader,
ContentLength: aws.Int64(int64(parsedCid.Size)),
})
if err != nil {
@ -2108,13 +2154,15 @@ func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId
return err
}
} else {
err := s.storage.S3MultipartUpload(ctx, verifier, s.config.Config().Core.Storage.S3.BufferBucket, cid, parsedCid.Size)
err := s.storage.S3MultipartUpload(ctx, importReader, s.config.Config().Core.Storage.S3.BufferBucket, cid, parsedCid.Size)
if err != nil {
s.logger.Error("error uploading object", zap.Error(err))
return err
}
}
importReader = _import.NewImportReader(s._import, __import, res.Body, parsedCid.Size, 3, totalStages)
upload, err := s.storage.UploadObject(ctx, s5.GetStorageProtocol(s.protocol), nil, &renter.MultiPartUploadParams{
ReaderFactory: func(start uint, end uint) (io.ReadCloser, error) {
rangeHeader := "bytes=%d-"
@ -2134,6 +2182,11 @@ func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId
return nil, err
}
err = importReader.ReadBytes(int(end - start))
if err != nil {
return nil, err
}
return object.Body, nil
},
Bucket: s.config.Config().Core.Storage.S3.BufferBucket,

View File

@ -4,6 +4,8 @@ import (
"flag"
"net/http"
_import "git.lumeweb.com/LumeWeb/portal/import"
"git.lumeweb.com/LumeWeb/portal/mailer"
"git.lumeweb.com/LumeWeb/portal/config"
@ -61,6 +63,7 @@ func main() {
cron.Module,
account.Module,
metadata.Module,
_import.Module,
mailer.Module,
protocols.BuildProtocols(cfg),
api.BuildApis(cfg),

25
db/models/import.go Normal file
View File

@ -0,0 +1,25 @@
package models
import "gorm.io/gorm"
type ImportStatus string
const (
ImportStatusQueued ImportStatus = "queued"
ImportStatusProcessing ImportStatus = "processing"
)
func init() {
registerModel(&Upload{})
}
type Import struct {
gorm.Model
UserID uint
Hash []byte `gorm:"type:binary(32);"`
Protocol string
User User
ImporterIP string
Status ImportStatus
Progress float64
}

269
import/import.go Normal file
View File

@ -0,0 +1,269 @@
package _import
import (
"context"
"errors"
"io"
"time"
"git.lumeweb.com/LumeWeb/portal/db/models"
"go.uber.org/fx"
"gorm.io/gorm"
)
var ErrNotFound = gorm.ErrRecordNotFound
var _ ImportService = (*ImportServiceDefault)(nil)
var _ io.ReadSeekCloser = (*ImportReader)(nil)
type ImportMetadata struct {
ID uint
UserID uint
Hash []byte
Status models.ImportStatus
Progress float64
Protocol string
ImporterIP string
Created time.Time
}
type ImportService interface {
SaveImport(ctx context.Context, metadata ImportMetadata, skipExisting bool) error
GetImport(ctx context.Context, objectHash []byte) (ImportMetadata, error)
DeleteImport(ctx context.Context, objectHash []byte) error
}
func (u ImportMetadata) IsEmpty() bool {
if u.UserID != 0 || u.Protocol != "" || u.ImporterIP != "" || u.Status != "" {
return false
}
if !u.Created.IsZero() {
return false
}
if len(u.Hash) != 0 {
return false
}
return true
}
var Module = fx.Module("import",
fx.Provide(
fx.Annotate(
NewImportService,
fx.As(new(ImportService)),
),
),
)
type ImportServiceDefault struct {
db *gorm.DB
}
func (i ImportServiceDefault) SaveImport(ctx context.Context, metadata ImportMetadata, skipExisting bool) error {
var __import models.Import
__import.Hash = metadata.Hash
ret := i.db.WithContext(ctx).Model(&models.Import{}).Where(&__import).First(&__import)
if ret.Error != nil {
if errors.Is(ret.Error, gorm.ErrRecordNotFound) {
return i.createImport(ctx, metadata)
}
return ret.Error
}
if skipExisting {
return nil
}
changed := false
if __import.UserID != metadata.UserID {
changed = true
}
if __import.Status != metadata.Status {
changed = true
}
if __import.Progress != metadata.Progress {
changed = true
}
if __import.Protocol != metadata.Protocol {
changed = true
}
if __import.ImporterIP != metadata.ImporterIP {
changed = true
}
if changed {
return i.db.Updates(&__import).Error
}
return nil
}
func (m *ImportServiceDefault) createImport(ctx context.Context, metadata ImportMetadata) error {
__import := models.Import{
UserID: metadata.UserID,
Hash: metadata.Hash,
Status: metadata.Status,
Progress: metadata.Progress,
Protocol: metadata.Protocol,
ImporterIP: metadata.ImporterIP,
}
if __import.Status == "" {
__import.Status = models.ImportStatusQueued
}
return m.db.WithContext(ctx).Create(&__import).Error
}
func (i ImportServiceDefault) GetImport(ctx context.Context, objectHash []byte) (ImportMetadata, error) {
var _import models.Import
_import.Hash = objectHash
ret := i.db.WithContext(ctx).Model(&models.Import{}).Where(&_import).First(&_import)
if ret.Error != nil {
if errors.Is(ret.Error, gorm.ErrRecordNotFound) {
return ImportMetadata{}, ErrNotFound
}
return ImportMetadata{}, ret.Error
}
return ImportMetadata{
ID: _import.ID,
UserID: _import.UserID,
Hash: _import.Hash,
Protocol: _import.Protocol,
Status: _import.Status,
Progress: _import.Progress,
ImporterIP: _import.ImporterIP,
Created: _import.CreatedAt,
}, nil
}
func (i ImportServiceDefault) DeleteImport(ctx context.Context, objectHash []byte) error {
var _import models.Import
_import.Hash = objectHash
ret := i.db.WithContext(ctx).Model(&models.Import{}).Where(&_import).Delete(&_import)
if ret.Error != nil {
if errors.Is(ret.Error, gorm.ErrRecordNotFound) {
return ErrNotFound
}
return ret.Error
}
return nil
}
type ImportServiceParams struct {
fx.In
Db *gorm.DB
}
func NewImportService(params ImportServiceParams) *ImportServiceDefault {
return &ImportServiceDefault{
db: params.Db,
}
}
type ImportReader struct {
service ImportService
meta ImportMetadata
reader io.Reader
size uint64
stage int
totalStages int
bytesRead uint64
}
func (i *ImportReader) Seek(offset int64, whence int) (int64, error) {
if seeker, ok := i.reader.(io.Seeker); ok {
// If seeking to the start, reset progress based on recorded bytes
if whence == io.SeekStart && offset == 0 {
i.bytesRead = 0
i.meta.Progress = 0
if err := i.service.SaveImport(context.Background(), i.meta, false); err != nil {
return 0, err
}
}
return seeker.Seek(offset, whence)
}
return 0, errors.New("Seek not supported")
}
func (i *ImportReader) Close() error {
if closer, ok := i.reader.(io.Closer); ok {
return closer.Close()
}
return nil
}
func (i *ImportReader) Read(p []byte) (n int, err error) {
n, err = i.reader.Read(p)
if err != nil {
return 0, err
}
// Update cumulative bytes read
i.bytesRead += uint64(n)
err = i.ReadBytes(n)
if err != nil {
return 0, err
}
return n, nil
}
func (i *ImportReader) ReadBytes(n int) (err error) {
stageProgress := float64(100) / float64(i.totalStages)
// Calculate progress based on bytes read
i.meta.Progress = float64(i.bytesRead) / float64(i.size) * 100.0
// Adjust progress for current stage
if i.stage > 1 {
i.meta.Progress += float64(i.stage-1) * stageProgress
}
// Ensure progress doesn't exceed 100%
if i.meta.Progress > 100 {
i.meta.Progress = 100
}
// Save import progress
err = i.service.SaveImport(context.Background(), i.meta, false)
if err != nil {
return err
}
return nil
}
func NewImportReader(service ImportService, meta ImportMetadata, reader io.Reader, size uint64, stage, totalStages int) *ImportReader {
return &ImportReader{
service: service,
meta: meta,
reader: reader,
size: size,
stage: stage,
totalStages: totalStages,
}
}

View File

@ -13,6 +13,8 @@ import (
var ErrNotFound = gorm.ErrRecordNotFound
var _ MetadataService = (*MetadataServiceDefault)(nil)
type UploadMetadata struct {
ID uint `json:"upload_id"`
UserID uint `json:"user_id"`