Compare commits

...

4 Commits

5 changed files with 85 additions and 25 deletions

View File

@ -8,6 +8,7 @@ import (
_ "git.lumeweb.com/LumeWeb/portal/docs"
"git.lumeweb.com/LumeWeb/portal/renterd"
"git.lumeweb.com/LumeWeb/portal/service/files"
"git.lumeweb.com/LumeWeb/portal/tus"
"git.lumeweb.com/LumeWeb/portal/validator"
"github.com/iris-contrib/swagger"
"github.com/iris-contrib/swagger/swaggerFiles"
@ -76,10 +77,10 @@ func main() {
app.Handle(new(controller.FilesController))
})
tus := initTus()
tusHandler := tus.Init()
v1.Any(TUS_API_PATH+"/{fileparam:path}", iris.FromStd(http.StripPrefix(v1.GetRelPath()+TUS_API_PATH+"/", tus)))
v1.Post(TUS_API_PATH, iris.FromStd(http.StripPrefix(v1.GetRelPath()+TUS_API_PATH, tus)))
v1.Any(tus.TUS_API_PATH+"/{fileparam:path}", iris.FromStd(http.StripPrefix(v1.GetRelPath()+tus.TUS_API_PATH+"/", tusHandler)))
v1.Post(tus.TUS_API_PATH, iris.FromStd(http.StripPrefix(v1.GetRelPath()+tus.TUS_API_PATH, tusHandler)))
swaggerConfig := swagger.Config{
// The url pointing to API definition.

View File

@ -7,6 +7,6 @@ import (
type Tus struct {
gorm.Model
UploadID string `gorm:"primaryKey"`
Path string
Id string
Hash string
}

View File

@ -2,6 +2,7 @@ package files
import (
"bufio"
"context"
"encoding/hex"
"errors"
"fmt"
@ -9,6 +10,7 @@ import (
"git.lumeweb.com/LumeWeb/portal/db"
"git.lumeweb.com/LumeWeb/portal/model"
"git.lumeweb.com/LumeWeb/portal/renterd"
"git.lumeweb.com/LumeWeb/portal/shared"
"github.com/go-resty/resty/v2"
"io"
"lukechampine.com/blake3"
@ -134,16 +136,35 @@ func Upload(r io.ReadSeeker, file *os.File) (model.Upload, error) {
return upload, nil
}
func Download(hash string) (io.Reader, error) {
result := db.Get().Table("uploads").Where(&model.Upload{Hash: hash}).Row()
uploadItem := db.Get().Table("uploads").Where(&model.Upload{Hash: hash}).Row()
tusItem := db.Get().Table("tus").Where(&model.Tus{Hash: hash}).Row()
if result.Err() != nil {
return nil, result.Err()
if uploadItem.Err() == nil {
fetch, err := client.R().SetDoNotParseResponse(true).Get(fmt.Sprintf("/worker/objects/%s", hash))
if err != nil {
return nil, err
}
return fetch.RawBody(), nil
} else if tusItem.Err() == nil {
var tusData model.Tus
err := tusItem.Scan(&tusData)
if err != nil {
return nil, err
}
upload, err := shared.GetTusStore().GetUpload(context.Background(), tusData.Id)
if err != nil {
return nil, err
}
reader, err := upload.GetReader(context.Background())
if err != nil {
return nil, err
}
return reader, nil
} else {
return nil, errors.New("invalid file")
}
fetch, err := client.R().SetDoNotParseResponse(true).Get(fmt.Sprintf("/worker/objects/%s", hash))
if err != nil {
return nil, err
}
return fetch.RawBody(), nil
}

35
shared/shared.go Normal file
View File

@ -0,0 +1,35 @@
package shared
import (
"github.com/golang-queue/queue"
"github.com/tus/tusd/pkg/filestore"
tusd "github.com/tus/tusd/pkg/handler"
)
var tusQueue *queue.Queue
var tusStore *filestore.FileStore
var tusComposer *tusd.StoreComposer
func SetTusQueue(q *queue.Queue) {
tusQueue = q
}
func GetTusQueue() *queue.Queue {
return tusQueue
}
func SetTusStore(s *filestore.FileStore) {
tusStore = s
}
func GetTusStore() *filestore.FileStore {
return tusStore
}
func SetTusComposer(c *tusd.StoreComposer) {
tusComposer = c
}
func GetTusComposer() *tusd.StoreComposer {
return tusComposer
}

View File

@ -1,4 +1,4 @@
package main
package tus
import (
"context"
@ -9,6 +9,7 @@ import (
"git.lumeweb.com/LumeWeb/portal/db"
"git.lumeweb.com/LumeWeb/portal/model"
"git.lumeweb.com/LumeWeb/portal/service/files"
"git.lumeweb.com/LumeWeb/portal/shared"
"github.com/golang-queue/queue"
"github.com/tus/tusd/pkg/filestore"
tusd "github.com/tus/tusd/pkg/handler"
@ -21,15 +22,13 @@ const TUS_API_PATH = "/files/tus"
const HASH_META_HEADER = "blake3-hash"
var tusQueue *queue.Queue
var store *filestore.FileStore
var composer *tusd.StoreComposer
func initTus() *tusd.Handler {
store = &filestore.FileStore{
func Init() *tusd.Handler {
store := &filestore.FileStore{
Path: "/tmp",
}
shared.SetTusStore(store)
composer := tusd.NewStoreComposer()
composer.UseCore(store)
composer.UseConcater(store)
@ -73,7 +72,7 @@ func initTus() *tusd.Handler {
},
PreFinishResponseCallback: func(hook tusd.HookEvent) error {
tusEntry := &model.Tus{
Path: hook.Upload.Storage["Path"],
Id: hook.Upload.ID,
Hash: hook.Upload.MetaData[HASH_META_HEADER],
}
@ -81,7 +80,7 @@ func initTus() *tusd.Handler {
return err
}
if err := tusQueue.QueueTask(func(ctx context.Context) error {
if err := shared.GetTusQueue().QueueTask(func(ctx context.Context) error {
upload, err := store.GetUpload(nil, hook.Upload.ID)
if err != nil {
return err
@ -97,7 +96,8 @@ func initTus() *tusd.Handler {
if err != nil {
panic(err)
}
tusQueue = queue.NewPool(5)
shared.SetTusQueue(queue.NewPool(5))
go tusStartup()
@ -108,6 +108,9 @@ func tusStartup() {
result := map[int]model.Tus{}
db.Get().Table("tus").Take(&result)
tusQueue := shared.GetTusQueue()
store := shared.GetTusStore()
for _, item := range result {
if err := tusQueue.QueueTask(func(ctx context.Context) error {
upload, err := store.GetUpload(nil, item.UploadID)
@ -150,7 +153,7 @@ func tusWorker(upload *tusd.Upload) error {
ret = db.Get().Delete(&tusUpload)
err = composer.Terminater.AsTerminatableUpload(*upload).Terminate(context.Background())
err = shared.GetTusComposer().Terminater.AsTerminatableUpload(*upload).Terminate(context.Background())
if err != nil {
log.Print(err)