diff --git a/db/db.go b/db/db.go index 4f4f5d3..715a50f 100644 --- a/db/db.go +++ b/db/db.go @@ -53,7 +53,7 @@ func Init() { } // Automatically migrate the database schema based on the model definitions. - err = db.Migrator().AutoMigrate(&model.Account{}, &model.Key{}, &model.KeyChallenge{}, &model.LoginSession{}, &model.Upload{}, &model.Pin{}) + err = db.Migrator().AutoMigrate(&model.Account{}, &model.Key{}, &model.KeyChallenge{}, &model.LoginSession{}, &model.Upload{}, &model.Pin{}, &model.Tus{}) if err != nil { panic(fmt.Errorf("Database setup failed database type: %s \n", err)) } diff --git a/main.go b/main.go index 3f29776..91ebffd 100644 --- a/main.go +++ b/main.go @@ -78,7 +78,8 @@ func main() { tus := initTus() - app.Any(API_PATH+"{fileparam:path}", iris.FromStd(http.StripPrefix(API_PATH, tus))) + v1.Any(TUS_API_PATH+"/{fileparam:path}", iris.FromStd(http.StripPrefix(v1.GetRelPath()+TUS_API_PATH+"/", tus))) + v1.Post(TUS_API_PATH, iris.FromStd(http.StripPrefix(v1.GetRelPath()+TUS_API_PATH, tus))) swaggerConfig := swagger.Config{ // The url pointing to API definition. diff --git a/model/tus.go b/model/tus.go new file mode 100644 index 0000000..9530caf --- /dev/null +++ b/model/tus.go @@ -0,0 +1,12 @@ +package model + +import ( + "gorm.io/gorm" +) + +type Tus struct { + gorm.Model + UploadID string `gorm:"primaryKey"` + Path string + Hash string +} diff --git a/tus.go b/tus.go new file mode 100644 index 0000000..24ed3b1 --- /dev/null +++ b/tus.go @@ -0,0 +1,165 @@ +package main + +import ( + "context" + "encoding/hex" + "encoding/json" + "errors" + "git.lumeweb.com/LumeWeb/portal/cid" + "git.lumeweb.com/LumeWeb/portal/db" + "git.lumeweb.com/LumeWeb/portal/model" + "git.lumeweb.com/LumeWeb/portal/service/files" + "github.com/golang-queue/queue" + "github.com/tus/tusd/pkg/filestore" + tusd "github.com/tus/tusd/pkg/handler" + "github.com/tus/tusd/pkg/memorylocker" + "io" + "log" +) + +const TUS_API_PATH = "/files/tus" + +const HASH_META_HEADER = "blake3-hash" + +var tusQueue *queue.Queue +var store *filestore.FileStore +var composer *tusd.StoreComposer + +func initTus() *tusd.Handler { + store = &filestore.FileStore{ + Path: "/tmp", + } + + composer := tusd.NewStoreComposer() + composer.UseCore(store) + composer.UseConcater(store) + composer.UseLocker(memorylocker.New()) + composer.UseTerminater(store) + + handler, err := tusd.NewHandler(tusd.Config{ + BasePath: "/api/v1" + TUS_API_PATH, + StoreComposer: composer, + PreUploadCreateCallback: func(hook tusd.HookEvent) error { + hash := hook.Upload.MetaData[HASH_META_HEADER] + + if len(hash) == 0 { + return errors.New("missing blake3-hash metadata") + } + + var upload model.Upload + result := db.Get().Where(&model.Upload{Hash: hash}).First(&upload) + if (result.Error != nil && result.Error.Error() != "record not found") || result.RowsAffected > 0 { + hashBytes, err := hex.DecodeString(hash) + if err != nil { + return err + } + + cidString, err := cid.Encode(hashBytes, uint64(hook.Upload.Size)) + + if err != nil { + return err + } + + resp, err := json.Marshal(UploadResponse{Cid: cidString}) + + if err != nil { + return err + } + + return tusd.NewHTTPError(errors.New(string(resp)), 304) + } + + return nil + }, + PreFinishResponseCallback: func(hook tusd.HookEvent) error { + tusEntry := &model.Tus{ + Path: hook.Upload.Storage["Path"], + Hash: hook.Upload.MetaData[HASH_META_HEADER], + } + + if err := db.Get().Create(tusEntry).Error; err != nil { + return err + } + + if err := tusQueue.QueueTask(func(ctx context.Context) error { + upload, err := store.GetUpload(nil, hook.Upload.ID) + if err != nil { + return err + } + return tusWorker(&upload) + }); err != nil { + return err + } + + return nil + }, + }) + if err != nil { + panic(err) + } + tusQueue = queue.NewPool(5) + + go tusStartup() + + return handler +} + +func tusStartup() { + result := map[int]model.Tus{} + db.Get().Table("tus").Take(&result) + + for _, item := range result { + if err := tusQueue.QueueTask(func(ctx context.Context) error { + upload, err := store.GetUpload(nil, item.UploadID) + if err != nil { + return err + } + return tusWorker(&upload) + }); err != nil { + log.Print(err) + } + } +} + +func tusWorker(upload *tusd.Upload) error { + info, err := (*upload).GetInfo(nil) + if err != nil { + log.Print(err) + return err + } + file, err := (*upload).GetReader(nil) + if err != nil { + log.Print(err) + return err + } + + _, err = files.Upload(file.(io.ReadSeeker)) + if err != nil { + log.Print(err) + return err + } + + hash := info.MetaData[HASH_META_HEADER] + + var tusUpload model.Tus + ret := db.Get().Where(&model.Tus{Hash: hash}).First(&tusUpload) + if ret.Error != nil && ret.Error.Error() != "record not found" { + log.Print(ret.Error) + return err + } + + ret = db.Get().Delete(&tusUpload) + + err = composer.Terminater.AsTerminatableUpload(*upload).Terminate(context.Background()) + + if err != nil { + log.Print(err) + return err + } + + return nil +} + +type UploadResponse struct { + Cid string `json:"cid"` +}