diff --git a/tus/tus.go b/tus/tus.go index efcf217..e71ce6c 100644 --- a/tus/tus.go +++ b/tus/tus.go @@ -76,31 +76,6 @@ func Init() *tusd.Handler { return tusd.NewHTTPError(errors.New(string(resp)), 304) } - return nil - }, - PreFinishResponseCallback: func(hook tusd.HookEvent) error { - tusEntry := &model.Tus{ - UploadID: hook.Upload.ID, - Hash: hook.Upload.MetaData[HASH_META_HEADER], - } - - if err := db.Get().Create(tusEntry).Error; err != nil { - logger.Get().Error("failed to create tus entry", zap.Error(err)) - return err - } - - if err := shared.GetTusQueue().QueueTask(func(ctx context.Context) error { - upload, err := store.GetUpload(nil, hook.Upload.ID) - if err != nil { - logger.Get().Error("failed to query tus upload", zap.Error(err)) - return err - } - return tusWorker(&upload) - }); err != nil { - logger.Get().Error("failed to queue tus upload", zap.Error(err)) - return err - } - return nil }, }) @@ -108,7 +83,10 @@ func Init() *tusd.Handler { panic(err) } - shared.SetTusQueue(queue.NewPool(5)) + pool := queue.NewPool(5) + + shared.SetTusQueue(pool) + shared.SetTusWorker(tusWorker) go tusStartup() @@ -119,8 +97,8 @@ func tusStartup() { result := map[int]model.Tus{} db.Get().Table("tus").Take(&result) - tusQueue := shared.GetTusQueue() - store := shared.GetTusStore() + tusQueue := getQueue() + store := getStore() for _, item := range result { if err := tusQueue.QueueTask(func(ctx context.Context) error { @@ -177,25 +155,12 @@ func tusWorker(upload *tusd.Upload) error { } func terminateUpload(upload tusd.Upload) error { - info, _ := upload.GetInfo(context.Background()) - err := shared.GetTusComposer().Terminater.AsTerminatableUpload(upload).Terminate(context.Background()) + err := getComposer().Terminater.AsTerminatableUpload(upload).Terminate(context.Background()) if err != nil { logger.Get().Error("failed deleting tus upload", zap.Error(err)) } - tusUpload := &model.Tus{UploadID: info.ID} - ret := db.Get().Where(tusUpload).First(&tusUpload) - - if ret.Error != nil && ret.Error.Error() != "record not found" { - logger.Get().Error("failed fetching tus entry", zap.Error(err)) - err = ret.Error - } - - err1 := db.Get().Where(&tusUpload).Delete(&tusUpload) - - _ = err1 - if err != nil { return err } @@ -206,3 +171,17 @@ func terminateUpload(upload tusd.Upload) error { type UploadResponse struct { Cid string `json:"cid"` } + +func getQueue() *queue.Queue { + ret := shared.GetTusQueue() + return (*ret).(*queue.Queue) +} + +func getStore() *tusstore.DbFileStore { + ret := shared.GetTusStore() + return (*ret).(*tusstore.DbFileStore) +} +func getComposer() *tusd.StoreComposer { + ret := shared.GetTusComposer() + return (*ret).(*tusd.StoreComposer) +} diff --git a/tusstore/store.go b/tusstore/store.go index c4aed09..2fea2d1 100644 --- a/tusstore/store.go +++ b/tusstore/store.go @@ -9,10 +9,11 @@ import ( "git.lumeweb.com/LumeWeb/portal/db" "git.lumeweb.com/LumeWeb/portal/logger" "git.lumeweb.com/LumeWeb/portal/model" + "git.lumeweb.com/LumeWeb/portal/shared" + "github.com/golang-queue/queue" "github.com/tus/tusd/pkg/handler" "go.uber.org/zap" "io" - "lukechampine.com/blake3" "os" "path/filepath" ) @@ -267,6 +268,18 @@ func (upload *fileUpload) getInfo() (*model.Tus, bool, error) { } func (upload *fileUpload) FinishUpload(ctx context.Context) error { + if err := getQueue().QueueTask(func(ctx context.Context) error { + upload, err := getStore().GetUpload(nil, upload.info.ID) + if err != nil { + logger.Get().Error("failed to query tus upload", zap.Error(err)) + return err + } + return shared.GetTusWorker()(&upload) + }); err != nil { + logger.Get().Error("failed to queue tus upload", zap.Error(err)) + return err + } + return nil } @@ -280,3 +293,12 @@ func uid() string { } return hex.EncodeToString(id) } +func getQueue() *queue.Queue { + ret := shared.GetTusQueue() + return (*ret).(*queue.Queue) +} + +func getStore() *DbFileStore { + ret := shared.GetTusStore() + return (*ret).(*DbFileStore) +}