feat: tus support
This commit is contained in:
parent
a8d2ad3393
commit
3005be6fec
2
db/db.go
2
db/db.go
|
@ -53,7 +53,7 @@ func Init() {
|
|||
}
|
||||
|
||||
// Automatically migrate the database schema based on the model definitions.
|
||||
err = db.Migrator().AutoMigrate(&model.Account{}, &model.Key{}, &model.KeyChallenge{}, &model.LoginSession{}, &model.Upload{}, &model.Pin{})
|
||||
err = db.Migrator().AutoMigrate(&model.Account{}, &model.Key{}, &model.KeyChallenge{}, &model.LoginSession{}, &model.Upload{}, &model.Pin{}, &model.Tus{})
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("Database setup failed database type: %s \n", err))
|
||||
}
|
||||
|
|
3
main.go
3
main.go
|
@ -78,7 +78,8 @@ func main() {
|
|||
|
||||
tus := initTus()
|
||||
|
||||
app.Any(API_PATH+"{fileparam:path}", iris.FromStd(http.StripPrefix(API_PATH, tus)))
|
||||
v1.Any(TUS_API_PATH+"/{fileparam:path}", iris.FromStd(http.StripPrefix(v1.GetRelPath()+TUS_API_PATH+"/", tus)))
|
||||
v1.Post(TUS_API_PATH, iris.FromStd(http.StripPrefix(v1.GetRelPath()+TUS_API_PATH, tus)))
|
||||
|
||||
swaggerConfig := swagger.Config{
|
||||
// The url pointing to API definition.
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
package model
|
||||
|
||||
import (
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type Tus struct {
|
||||
gorm.Model
|
||||
UploadID string `gorm:"primaryKey"`
|
||||
Path string
|
||||
Hash string
|
||||
}
|
|
@ -0,0 +1,165 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"git.lumeweb.com/LumeWeb/portal/cid"
|
||||
"git.lumeweb.com/LumeWeb/portal/db"
|
||||
"git.lumeweb.com/LumeWeb/portal/model"
|
||||
"git.lumeweb.com/LumeWeb/portal/service/files"
|
||||
"github.com/golang-queue/queue"
|
||||
"github.com/tus/tusd/pkg/filestore"
|
||||
tusd "github.com/tus/tusd/pkg/handler"
|
||||
"github.com/tus/tusd/pkg/memorylocker"
|
||||
"io"
|
||||
"log"
|
||||
)
|
||||
|
||||
const TUS_API_PATH = "/files/tus"
|
||||
|
||||
const HASH_META_HEADER = "blake3-hash"
|
||||
|
||||
var tusQueue *queue.Queue
|
||||
var store *filestore.FileStore
|
||||
var composer *tusd.StoreComposer
|
||||
|
||||
func initTus() *tusd.Handler {
|
||||
store = &filestore.FileStore{
|
||||
Path: "/tmp",
|
||||
}
|
||||
|
||||
composer := tusd.NewStoreComposer()
|
||||
composer.UseCore(store)
|
||||
composer.UseConcater(store)
|
||||
composer.UseLocker(memorylocker.New())
|
||||
composer.UseTerminater(store)
|
||||
|
||||
handler, err := tusd.NewHandler(tusd.Config{
|
||||
BasePath: "/api/v1" + TUS_API_PATH,
|
||||
StoreComposer: composer,
|
||||
PreUploadCreateCallback: func(hook tusd.HookEvent) error {
|
||||
hash := hook.Upload.MetaData[HASH_META_HEADER]
|
||||
|
||||
if len(hash) == 0 {
|
||||
return errors.New("missing blake3-hash metadata")
|
||||
}
|
||||
|
||||
var upload model.Upload
|
||||
result := db.Get().Where(&model.Upload{Hash: hash}).First(&upload)
|
||||
if (result.Error != nil && result.Error.Error() != "record not found") || result.RowsAffected > 0 {
|
||||
hashBytes, err := hex.DecodeString(hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cidString, err := cid.Encode(hashBytes, uint64(hook.Upload.Size))
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
resp, err := json.Marshal(UploadResponse{Cid: cidString})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return tusd.NewHTTPError(errors.New(string(resp)), 304)
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
PreFinishResponseCallback: func(hook tusd.HookEvent) error {
|
||||
tusEntry := &model.Tus{
|
||||
Path: hook.Upload.Storage["Path"],
|
||||
Hash: hook.Upload.MetaData[HASH_META_HEADER],
|
||||
}
|
||||
|
||||
if err := db.Get().Create(tusEntry).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := tusQueue.QueueTask(func(ctx context.Context) error {
|
||||
upload, err := store.GetUpload(nil, hook.Upload.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return tusWorker(&upload)
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
tusQueue = queue.NewPool(5)
|
||||
|
||||
go tusStartup()
|
||||
|
||||
return handler
|
||||
}
|
||||
|
||||
func tusStartup() {
|
||||
result := map[int]model.Tus{}
|
||||
db.Get().Table("tus").Take(&result)
|
||||
|
||||
for _, item := range result {
|
||||
if err := tusQueue.QueueTask(func(ctx context.Context) error {
|
||||
upload, err := store.GetUpload(nil, item.UploadID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return tusWorker(&upload)
|
||||
}); err != nil {
|
||||
log.Print(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func tusWorker(upload *tusd.Upload) error {
|
||||
info, err := (*upload).GetInfo(nil)
|
||||
if err != nil {
|
||||
log.Print(err)
|
||||
return err
|
||||
}
|
||||
file, err := (*upload).GetReader(nil)
|
||||
if err != nil {
|
||||
log.Print(err)
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = files.Upload(file.(io.ReadSeeker))
|
||||
if err != nil {
|
||||
log.Print(err)
|
||||
return err
|
||||
}
|
||||
|
||||
hash := info.MetaData[HASH_META_HEADER]
|
||||
|
||||
var tusUpload model.Tus
|
||||
ret := db.Get().Where(&model.Tus{Hash: hash}).First(&tusUpload)
|
||||
if ret.Error != nil && ret.Error.Error() != "record not found" {
|
||||
log.Print(ret.Error)
|
||||
return err
|
||||
}
|
||||
|
||||
ret = db.Get().Delete(&tusUpload)
|
||||
|
||||
err = composer.Terminater.AsTerminatableUpload(*upload).Terminate(context.Background())
|
||||
|
||||
if err != nil {
|
||||
log.Print(err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type UploadResponse struct {
|
||||
Cid string `json:"cid"`
|
||||
}
|
Loading…
Reference in New Issue