From 0e3a25aa8abc04299f096d941d94f1ae329bf386 Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Fri, 22 Mar 2024 15:52:11 -0400 Subject: [PATCH] feat: add import metadata db service based off the upload metadata service --- cmd/portal/main.go | 3 + db/models/import.go | 25 ++++++ import/import.go | 182 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 210 insertions(+) create mode 100644 db/models/import.go create mode 100644 import/import.go diff --git a/cmd/portal/main.go b/cmd/portal/main.go index 1e8a2c2..c8ca50d 100644 --- a/cmd/portal/main.go +++ b/cmd/portal/main.go @@ -4,6 +4,8 @@ import ( "flag" "net/http" + _import "git.lumeweb.com/LumeWeb/portal/import" + "git.lumeweb.com/LumeWeb/portal/mailer" "git.lumeweb.com/LumeWeb/portal/config" @@ -61,6 +63,7 @@ func main() { cron.Module, account.Module, metadata.Module, + _import.Module, mailer.Module, protocols.BuildProtocols(cfg), api.BuildApis(cfg), diff --git a/db/models/import.go b/db/models/import.go new file mode 100644 index 0000000..4e3d895 --- /dev/null +++ b/db/models/import.go @@ -0,0 +1,25 @@ +package models + +import "gorm.io/gorm" + +type ImportStatus string + +const ( + ImportStatusQueued ImportStatus = "queued" + ImportStatusProcessing ImportStatus = "processing" +) + +func init() { + registerModel(&Upload{}) +} + +type Import struct { + gorm.Model + UserID uint + Hash []byte `gorm:"type:binary(32);"` + Protocol string + User User + ImporterIP string + Status ImportStatus + Progress float64 +} diff --git a/import/import.go b/import/import.go new file mode 100644 index 0000000..7459b47 --- /dev/null +++ b/import/import.go @@ -0,0 +1,182 @@ +package _import + +import ( + "context" + "errors" + "io" + "time" + + "git.lumeweb.com/LumeWeb/portal/db/models" + + "go.uber.org/fx" + "gorm.io/gorm" +) + +var ErrNotFound = gorm.ErrRecordNotFound + +var _ ImportService = (*ImportServiceDefault)(nil) +var _ io.Reader = (*ImportReader)(nil) + +type ImportMetadata struct { + ID uint + UserID uint + Hash []byte + Status models.ImportStatus + Progress float64 + Protocol string + ImporterIP string + Created time.Time +} + +type ImportService interface { + SaveImport(ctx context.Context, metadata ImportMetadata, skipExisting bool) error + GetImport(ctx context.Context, objectHash []byte) (ImportMetadata, error) + DeleteImport(ctx context.Context, objectHash []byte) error +} + +func (u ImportMetadata) IsEmpty() bool { + if u.UserID != 0 || u.Protocol != "" || u.ImporterIP != "" || u.Status != "" { + return false + } + + if !u.Created.IsZero() { + return false + } + + if len(u.Hash) != 0 { + return false + } + + return true +} + +var Module = fx.Module("import", + fx.Provide( + fx.Annotate( + NewImportService, + fx.As(new(ImportService)), + ), + ), +) + +type ImportServiceDefault struct { + db *gorm.DB +} + +func (i ImportServiceDefault) SaveImport(ctx context.Context, metadata ImportMetadata, skipExisting bool) error { + var __import models.Import + + __import.Hash = metadata.Hash + + ret := i.db.WithContext(ctx).Model(&models.Import{}).Where(&__import).First(&__import) + + if ret.Error != nil { + if errors.Is(ret.Error, gorm.ErrRecordNotFound) { + return i.createImport(ctx, metadata) + } + return ret.Error + } + + if skipExisting { + return nil + } + + changed := false + + if __import.UserID != metadata.UserID { + changed = true + } + + if __import.Status != metadata.Status { + changed = true + } + + if __import.Progress != metadata.Progress { + changed = true + } + + if __import.Protocol != metadata.Protocol { + changed = true + } + + if __import.ImporterIP != metadata.ImporterIP { + changed = true + } + if changed { + return i.db.Updates(&__import).Error + } + + return nil +} + +func (m *ImportServiceDefault) createImport(ctx context.Context, metadata ImportMetadata) error { + __import := models.Import{ + UserID: metadata.UserID, + Hash: metadata.Hash, + Status: metadata.Status, + Progress: metadata.Progress, + Protocol: metadata.Protocol, + ImporterIP: metadata.ImporterIP, + } + + if __import.Status == "" { + __import.Status = models.ImportStatusQueued + } + + return m.db.WithContext(ctx).Create(&__import).Error +} + +func (i ImportServiceDefault) GetImport(ctx context.Context, objectHash []byte) (ImportMetadata, error) { + var _import models.Import + + _import.Hash = objectHash + + ret := i.db.WithContext(ctx).Model(&models.Import{}).Where(&_import).First(&_import) + + if ret.Error != nil { + if errors.Is(ret.Error, gorm.ErrRecordNotFound) { + return ImportMetadata{}, ErrNotFound + } + return ImportMetadata{}, ret.Error + + } + + return ImportMetadata{ + ID: _import.ID, + UserID: _import.UserID, + Hash: _import.Hash, + Protocol: _import.Protocol, + Status: _import.Status, + Progress: _import.Progress, + ImporterIP: _import.ImporterIP, + Created: _import.CreatedAt, + }, nil +} + +func (i ImportServiceDefault) DeleteImport(ctx context.Context, objectHash []byte) error { + var _import models.Import + + _import.Hash = objectHash + + ret := i.db.WithContext(ctx).Model(&models.Import{}).Where(&_import).Delete(&_import) + + if ret.Error != nil { + if errors.Is(ret.Error, gorm.ErrRecordNotFound) { + return ErrNotFound + } + return ret.Error + } + + return nil +} + +type ImportServiceParams struct { + fx.In + Db *gorm.DB +} + +func NewImportService(params ImportServiceParams) *ImportServiceDefault { + return &ImportServiceDefault{ + db: params.Db, + } +}