refactor: move terminateUpload db logic to store

This commit is contained in:
Derrick Hammer 2023-05-23 20:12:48 -04:00
parent e7d1bd0f09
commit 396b3f60a8
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
2 changed files with 44 additions and 43 deletions

View File

@ -76,31 +76,6 @@ func Init() *tusd.Handler {
return tusd.NewHTTPError(errors.New(string(resp)), 304) return tusd.NewHTTPError(errors.New(string(resp)), 304)
} }
return nil
},
PreFinishResponseCallback: func(hook tusd.HookEvent) error {
tusEntry := &model.Tus{
UploadID: hook.Upload.ID,
Hash: hook.Upload.MetaData[HASH_META_HEADER],
}
if err := db.Get().Create(tusEntry).Error; err != nil {
logger.Get().Error("failed to create tus entry", zap.Error(err))
return err
}
if err := shared.GetTusQueue().QueueTask(func(ctx context.Context) error {
upload, err := store.GetUpload(nil, hook.Upload.ID)
if err != nil {
logger.Get().Error("failed to query tus upload", zap.Error(err))
return err
}
return tusWorker(&upload)
}); err != nil {
logger.Get().Error("failed to queue tus upload", zap.Error(err))
return err
}
return nil return nil
}, },
}) })
@ -108,7 +83,10 @@ func Init() *tusd.Handler {
panic(err) panic(err)
} }
shared.SetTusQueue(queue.NewPool(5)) pool := queue.NewPool(5)
shared.SetTusQueue(pool)
shared.SetTusWorker(tusWorker)
go tusStartup() go tusStartup()
@ -119,8 +97,8 @@ func tusStartup() {
result := map[int]model.Tus{} result := map[int]model.Tus{}
db.Get().Table("tus").Take(&result) db.Get().Table("tus").Take(&result)
tusQueue := shared.GetTusQueue() tusQueue := getQueue()
store := shared.GetTusStore() store := getStore()
for _, item := range result { for _, item := range result {
if err := tusQueue.QueueTask(func(ctx context.Context) error { if err := tusQueue.QueueTask(func(ctx context.Context) error {
@ -177,25 +155,12 @@ func tusWorker(upload *tusd.Upload) error {
} }
func terminateUpload(upload tusd.Upload) error { func terminateUpload(upload tusd.Upload) error {
info, _ := upload.GetInfo(context.Background()) err := getComposer().Terminater.AsTerminatableUpload(upload).Terminate(context.Background())
err := shared.GetTusComposer().Terminater.AsTerminatableUpload(upload).Terminate(context.Background())
if err != nil { if err != nil {
logger.Get().Error("failed deleting tus upload", zap.Error(err)) logger.Get().Error("failed deleting tus upload", zap.Error(err))
} }
tusUpload := &model.Tus{UploadID: info.ID}
ret := db.Get().Where(tusUpload).First(&tusUpload)
if ret.Error != nil && ret.Error.Error() != "record not found" {
logger.Get().Error("failed fetching tus entry", zap.Error(err))
err = ret.Error
}
err1 := db.Get().Where(&tusUpload).Delete(&tusUpload)
_ = err1
if err != nil { if err != nil {
return err return err
} }
@ -206,3 +171,17 @@ func terminateUpload(upload tusd.Upload) error {
type UploadResponse struct { type UploadResponse struct {
Cid string `json:"cid"` Cid string `json:"cid"`
} }
func getQueue() *queue.Queue {
ret := shared.GetTusQueue()
return (*ret).(*queue.Queue)
}
func getStore() *tusstore.DbFileStore {
ret := shared.GetTusStore()
return (*ret).(*tusstore.DbFileStore)
}
func getComposer() *tusd.StoreComposer {
ret := shared.GetTusComposer()
return (*ret).(*tusd.StoreComposer)
}

View File

@ -9,10 +9,11 @@ import (
"git.lumeweb.com/LumeWeb/portal/db" "git.lumeweb.com/LumeWeb/portal/db"
"git.lumeweb.com/LumeWeb/portal/logger" "git.lumeweb.com/LumeWeb/portal/logger"
"git.lumeweb.com/LumeWeb/portal/model" "git.lumeweb.com/LumeWeb/portal/model"
"git.lumeweb.com/LumeWeb/portal/shared"
"github.com/golang-queue/queue"
"github.com/tus/tusd/pkg/handler" "github.com/tus/tusd/pkg/handler"
"go.uber.org/zap" "go.uber.org/zap"
"io" "io"
"lukechampine.com/blake3"
"os" "os"
"path/filepath" "path/filepath"
) )
@ -267,6 +268,18 @@ func (upload *fileUpload) getInfo() (*model.Tus, bool, error) {
} }
func (upload *fileUpload) FinishUpload(ctx context.Context) error { func (upload *fileUpload) FinishUpload(ctx context.Context) error {
if err := getQueue().QueueTask(func(ctx context.Context) error {
upload, err := getStore().GetUpload(nil, upload.info.ID)
if err != nil {
logger.Get().Error("failed to query tus upload", zap.Error(err))
return err
}
return shared.GetTusWorker()(&upload)
}); err != nil {
logger.Get().Error("failed to queue tus upload", zap.Error(err))
return err
}
return nil return nil
} }
@ -280,3 +293,12 @@ func uid() string {
} }
return hex.EncodeToString(id) return hex.EncodeToString(id)
} }
func getQueue() *queue.Queue {
ret := shared.GetTusQueue()
return (*ret).(*queue.Queue)
}
func getStore() *DbFileStore {
ret := shared.GetTusStore()
return (*ret).(*DbFileStore)
}