Compare commits

..

4 Commits

5 changed files with 85 additions and 25 deletions

View File

@ -8,6 +8,7 @@ import (
_ "git.lumeweb.com/LumeWeb/portal/docs" _ "git.lumeweb.com/LumeWeb/portal/docs"
"git.lumeweb.com/LumeWeb/portal/renterd" "git.lumeweb.com/LumeWeb/portal/renterd"
"git.lumeweb.com/LumeWeb/portal/service/files" "git.lumeweb.com/LumeWeb/portal/service/files"
"git.lumeweb.com/LumeWeb/portal/tus"
"git.lumeweb.com/LumeWeb/portal/validator" "git.lumeweb.com/LumeWeb/portal/validator"
"github.com/iris-contrib/swagger" "github.com/iris-contrib/swagger"
"github.com/iris-contrib/swagger/swaggerFiles" "github.com/iris-contrib/swagger/swaggerFiles"
@ -76,10 +77,10 @@ func main() {
app.Handle(new(controller.FilesController)) app.Handle(new(controller.FilesController))
}) })
tus := initTus() tusHandler := tus.Init()
v1.Any(TUS_API_PATH+"/{fileparam:path}", iris.FromStd(http.StripPrefix(v1.GetRelPath()+TUS_API_PATH+"/", tus))) v1.Any(tus.TUS_API_PATH+"/{fileparam:path}", iris.FromStd(http.StripPrefix(v1.GetRelPath()+tus.TUS_API_PATH+"/", tusHandler)))
v1.Post(TUS_API_PATH, iris.FromStd(http.StripPrefix(v1.GetRelPath()+TUS_API_PATH, tus))) v1.Post(tus.TUS_API_PATH, iris.FromStd(http.StripPrefix(v1.GetRelPath()+tus.TUS_API_PATH, tusHandler)))
swaggerConfig := swagger.Config{ swaggerConfig := swagger.Config{
// The url pointing to API definition. // The url pointing to API definition.

View File

@ -7,6 +7,6 @@ import (
type Tus struct { type Tus struct {
gorm.Model gorm.Model
UploadID string `gorm:"primaryKey"` UploadID string `gorm:"primaryKey"`
Path string Id string
Hash string Hash string
} }

View File

@ -2,6 +2,7 @@ package files
import ( import (
"bufio" "bufio"
"context"
"encoding/hex" "encoding/hex"
"errors" "errors"
"fmt" "fmt"
@ -9,6 +10,7 @@ import (
"git.lumeweb.com/LumeWeb/portal/db" "git.lumeweb.com/LumeWeb/portal/db"
"git.lumeweb.com/LumeWeb/portal/model" "git.lumeweb.com/LumeWeb/portal/model"
"git.lumeweb.com/LumeWeb/portal/renterd" "git.lumeweb.com/LumeWeb/portal/renterd"
"git.lumeweb.com/LumeWeb/portal/shared"
"github.com/go-resty/resty/v2" "github.com/go-resty/resty/v2"
"io" "io"
"lukechampine.com/blake3" "lukechampine.com/blake3"
@ -134,16 +136,35 @@ func Upload(r io.ReadSeeker, file *os.File) (model.Upload, error) {
return upload, nil return upload, nil
} }
func Download(hash string) (io.Reader, error) { func Download(hash string) (io.Reader, error) {
result := db.Get().Table("uploads").Where(&model.Upload{Hash: hash}).Row() uploadItem := db.Get().Table("uploads").Where(&model.Upload{Hash: hash}).Row()
tusItem := db.Get().Table("tus").Where(&model.Tus{Hash: hash}).Row()
if result.Err() != nil { if uploadItem.Err() == nil {
return nil, result.Err() fetch, err := client.R().SetDoNotParseResponse(true).Get(fmt.Sprintf("/worker/objects/%s", hash))
if err != nil {
return nil, err
}
return fetch.RawBody(), nil
} else if tusItem.Err() == nil {
var tusData model.Tus
err := tusItem.Scan(&tusData)
if err != nil {
return nil, err
}
upload, err := shared.GetTusStore().GetUpload(context.Background(), tusData.Id)
if err != nil {
return nil, err
}
reader, err := upload.GetReader(context.Background())
if err != nil {
return nil, err
}
return reader, nil
} else {
return nil, errors.New("invalid file")
} }
fetch, err := client.R().SetDoNotParseResponse(true).Get(fmt.Sprintf("/worker/objects/%s", hash))
if err != nil {
return nil, err
}
return fetch.RawBody(), nil
} }

35
shared/shared.go Normal file
View File

@ -0,0 +1,35 @@
package shared
import (
"github.com/golang-queue/queue"
"github.com/tus/tusd/pkg/filestore"
tusd "github.com/tus/tusd/pkg/handler"
)
var tusQueue *queue.Queue
var tusStore *filestore.FileStore
var tusComposer *tusd.StoreComposer
func SetTusQueue(q *queue.Queue) {
tusQueue = q
}
func GetTusQueue() *queue.Queue {
return tusQueue
}
func SetTusStore(s *filestore.FileStore) {
tusStore = s
}
func GetTusStore() *filestore.FileStore {
return tusStore
}
func SetTusComposer(c *tusd.StoreComposer) {
tusComposer = c
}
func GetTusComposer() *tusd.StoreComposer {
return tusComposer
}

View File

@ -1,4 +1,4 @@
package main package tus
import ( import (
"context" "context"
@ -9,6 +9,7 @@ import (
"git.lumeweb.com/LumeWeb/portal/db" "git.lumeweb.com/LumeWeb/portal/db"
"git.lumeweb.com/LumeWeb/portal/model" "git.lumeweb.com/LumeWeb/portal/model"
"git.lumeweb.com/LumeWeb/portal/service/files" "git.lumeweb.com/LumeWeb/portal/service/files"
"git.lumeweb.com/LumeWeb/portal/shared"
"github.com/golang-queue/queue" "github.com/golang-queue/queue"
"github.com/tus/tusd/pkg/filestore" "github.com/tus/tusd/pkg/filestore"
tusd "github.com/tus/tusd/pkg/handler" tusd "github.com/tus/tusd/pkg/handler"
@ -21,15 +22,13 @@ const TUS_API_PATH = "/files/tus"
const HASH_META_HEADER = "blake3-hash" const HASH_META_HEADER = "blake3-hash"
var tusQueue *queue.Queue func Init() *tusd.Handler {
var store *filestore.FileStore store := &filestore.FileStore{
var composer *tusd.StoreComposer
func initTus() *tusd.Handler {
store = &filestore.FileStore{
Path: "/tmp", Path: "/tmp",
} }
shared.SetTusStore(store)
composer := tusd.NewStoreComposer() composer := tusd.NewStoreComposer()
composer.UseCore(store) composer.UseCore(store)
composer.UseConcater(store) composer.UseConcater(store)
@ -73,7 +72,7 @@ func initTus() *tusd.Handler {
}, },
PreFinishResponseCallback: func(hook tusd.HookEvent) error { PreFinishResponseCallback: func(hook tusd.HookEvent) error {
tusEntry := &model.Tus{ tusEntry := &model.Tus{
Path: hook.Upload.Storage["Path"], Id: hook.Upload.ID,
Hash: hook.Upload.MetaData[HASH_META_HEADER], Hash: hook.Upload.MetaData[HASH_META_HEADER],
} }
@ -81,7 +80,7 @@ func initTus() *tusd.Handler {
return err return err
} }
if err := tusQueue.QueueTask(func(ctx context.Context) error { if err := shared.GetTusQueue().QueueTask(func(ctx context.Context) error {
upload, err := store.GetUpload(nil, hook.Upload.ID) upload, err := store.GetUpload(nil, hook.Upload.ID)
if err != nil { if err != nil {
return err return err
@ -97,7 +96,8 @@ func initTus() *tusd.Handler {
if err != nil { if err != nil {
panic(err) panic(err)
} }
tusQueue = queue.NewPool(5)
shared.SetTusQueue(queue.NewPool(5))
go tusStartup() go tusStartup()
@ -108,6 +108,9 @@ func tusStartup() {
result := map[int]model.Tus{} result := map[int]model.Tus{}
db.Get().Table("tus").Take(&result) db.Get().Table("tus").Take(&result)
tusQueue := shared.GetTusQueue()
store := shared.GetTusStore()
for _, item := range result { for _, item := range result {
if err := tusQueue.QueueTask(func(ctx context.Context) error { if err := tusQueue.QueueTask(func(ctx context.Context) error {
upload, err := store.GetUpload(nil, item.UploadID) upload, err := store.GetUpload(nil, item.UploadID)
@ -150,7 +153,7 @@ func tusWorker(upload *tusd.Upload) error {
ret = db.Get().Delete(&tusUpload) ret = db.Get().Delete(&tusUpload)
err = composer.Terminater.AsTerminatableUpload(*upload).Terminate(context.Background()) err = shared.GetTusComposer().Terminater.AsTerminatableUpload(*upload).Terminate(context.Background())
if err != nil { if err != nil {
log.Print(err) log.Print(err)