Compare commits

..

No commits in common. "2f514c02be0bd113b1ad4cdd930a1e15d4b57fb4" and "673f7c6dfd4cee1386462bb3841ce2742fe96b15" have entirely different histories.

5 changed files with 25 additions and 85 deletions

View File

@ -8,7 +8,6 @@ import (
_ "git.lumeweb.com/LumeWeb/portal/docs" _ "git.lumeweb.com/LumeWeb/portal/docs"
"git.lumeweb.com/LumeWeb/portal/renterd" "git.lumeweb.com/LumeWeb/portal/renterd"
"git.lumeweb.com/LumeWeb/portal/service/files" "git.lumeweb.com/LumeWeb/portal/service/files"
"git.lumeweb.com/LumeWeb/portal/tus"
"git.lumeweb.com/LumeWeb/portal/validator" "git.lumeweb.com/LumeWeb/portal/validator"
"github.com/iris-contrib/swagger" "github.com/iris-contrib/swagger"
"github.com/iris-contrib/swagger/swaggerFiles" "github.com/iris-contrib/swagger/swaggerFiles"
@ -77,10 +76,10 @@ func main() {
app.Handle(new(controller.FilesController)) app.Handle(new(controller.FilesController))
}) })
tusHandler := tus.Init() tus := initTus()
v1.Any(tus.TUS_API_PATH+"/{fileparam:path}", iris.FromStd(http.StripPrefix(v1.GetRelPath()+tus.TUS_API_PATH+"/", tusHandler))) v1.Any(TUS_API_PATH+"/{fileparam:path}", iris.FromStd(http.StripPrefix(v1.GetRelPath()+TUS_API_PATH+"/", tus)))
v1.Post(tus.TUS_API_PATH, iris.FromStd(http.StripPrefix(v1.GetRelPath()+tus.TUS_API_PATH, tusHandler))) v1.Post(TUS_API_PATH, iris.FromStd(http.StripPrefix(v1.GetRelPath()+TUS_API_PATH, tus)))
swaggerConfig := swagger.Config{ swaggerConfig := swagger.Config{
// The url pointing to API definition. // The url pointing to API definition.

View File

@ -7,6 +7,6 @@ import (
type Tus struct { type Tus struct {
gorm.Model gorm.Model
UploadID string `gorm:"primaryKey"` UploadID string `gorm:"primaryKey"`
Id string Path string
Hash string Hash string
} }

View File

@ -2,7 +2,6 @@ package files
import ( import (
"bufio" "bufio"
"context"
"encoding/hex" "encoding/hex"
"errors" "errors"
"fmt" "fmt"
@ -10,7 +9,6 @@ import (
"git.lumeweb.com/LumeWeb/portal/db" "git.lumeweb.com/LumeWeb/portal/db"
"git.lumeweb.com/LumeWeb/portal/model" "git.lumeweb.com/LumeWeb/portal/model"
"git.lumeweb.com/LumeWeb/portal/renterd" "git.lumeweb.com/LumeWeb/portal/renterd"
"git.lumeweb.com/LumeWeb/portal/shared"
"github.com/go-resty/resty/v2" "github.com/go-resty/resty/v2"
"io" "io"
"lukechampine.com/blake3" "lukechampine.com/blake3"
@ -136,35 +134,16 @@ func Upload(r io.ReadSeeker, file *os.File) (model.Upload, error) {
return upload, nil return upload, nil
} }
func Download(hash string) (io.Reader, error) { func Download(hash string) (io.Reader, error) {
uploadItem := db.Get().Table("uploads").Where(&model.Upload{Hash: hash}).Row() result := db.Get().Table("uploads").Where(&model.Upload{Hash: hash}).Row()
tusItem := db.Get().Table("tus").Where(&model.Tus{Hash: hash}).Row()
if result.Err() != nil {
return nil, result.Err()
}
if uploadItem.Err() == nil {
fetch, err := client.R().SetDoNotParseResponse(true).Get(fmt.Sprintf("/worker/objects/%s", hash)) fetch, err := client.R().SetDoNotParseResponse(true).Get(fmt.Sprintf("/worker/objects/%s", hash))
if err != nil { if err != nil {
return nil, err return nil, err
} }
return fetch.RawBody(), nil return fetch.RawBody(), nil
} else if tusItem.Err() == nil {
var tusData model.Tus
err := tusItem.Scan(&tusData)
if err != nil {
return nil, err
}
upload, err := shared.GetTusStore().GetUpload(context.Background(), tusData.Id)
if err != nil {
return nil, err
}
reader, err := upload.GetReader(context.Background())
if err != nil {
return nil, err
}
return reader, nil
} else {
return nil, errors.New("invalid file")
}
} }

View File

@ -1,35 +0,0 @@
package shared
import (
"github.com/golang-queue/queue"
"github.com/tus/tusd/pkg/filestore"
tusd "github.com/tus/tusd/pkg/handler"
)
var tusQueue *queue.Queue
var tusStore *filestore.FileStore
var tusComposer *tusd.StoreComposer
func SetTusQueue(q *queue.Queue) {
tusQueue = q
}
func GetTusQueue() *queue.Queue {
return tusQueue
}
func SetTusStore(s *filestore.FileStore) {
tusStore = s
}
func GetTusStore() *filestore.FileStore {
return tusStore
}
func SetTusComposer(c *tusd.StoreComposer) {
tusComposer = c
}
func GetTusComposer() *tusd.StoreComposer {
return tusComposer
}

View File

@ -1,4 +1,4 @@
package tus package main
import ( import (
"context" "context"
@ -9,7 +9,6 @@ import (
"git.lumeweb.com/LumeWeb/portal/db" "git.lumeweb.com/LumeWeb/portal/db"
"git.lumeweb.com/LumeWeb/portal/model" "git.lumeweb.com/LumeWeb/portal/model"
"git.lumeweb.com/LumeWeb/portal/service/files" "git.lumeweb.com/LumeWeb/portal/service/files"
"git.lumeweb.com/LumeWeb/portal/shared"
"github.com/golang-queue/queue" "github.com/golang-queue/queue"
"github.com/tus/tusd/pkg/filestore" "github.com/tus/tusd/pkg/filestore"
tusd "github.com/tus/tusd/pkg/handler" tusd "github.com/tus/tusd/pkg/handler"
@ -22,13 +21,15 @@ const TUS_API_PATH = "/files/tus"
const HASH_META_HEADER = "blake3-hash" const HASH_META_HEADER = "blake3-hash"
func Init() *tusd.Handler { var tusQueue *queue.Queue
store := &filestore.FileStore{ var store *filestore.FileStore
var composer *tusd.StoreComposer
func initTus() *tusd.Handler {
store = &filestore.FileStore{
Path: "/tmp", Path: "/tmp",
} }
shared.SetTusStore(store)
composer := tusd.NewStoreComposer() composer := tusd.NewStoreComposer()
composer.UseCore(store) composer.UseCore(store)
composer.UseConcater(store) composer.UseConcater(store)
@ -72,7 +73,7 @@ func Init() *tusd.Handler {
}, },
PreFinishResponseCallback: func(hook tusd.HookEvent) error { PreFinishResponseCallback: func(hook tusd.HookEvent) error {
tusEntry := &model.Tus{ tusEntry := &model.Tus{
Id: hook.Upload.ID, Path: hook.Upload.Storage["Path"],
Hash: hook.Upload.MetaData[HASH_META_HEADER], Hash: hook.Upload.MetaData[HASH_META_HEADER],
} }
@ -80,7 +81,7 @@ func Init() *tusd.Handler {
return err return err
} }
if err := shared.GetTusQueue().QueueTask(func(ctx context.Context) error { if err := tusQueue.QueueTask(func(ctx context.Context) error {
upload, err := store.GetUpload(nil, hook.Upload.ID) upload, err := store.GetUpload(nil, hook.Upload.ID)
if err != nil { if err != nil {
return err return err
@ -96,8 +97,7 @@ func Init() *tusd.Handler {
if err != nil { if err != nil {
panic(err) panic(err)
} }
tusQueue = queue.NewPool(5)
shared.SetTusQueue(queue.NewPool(5))
go tusStartup() go tusStartup()
@ -108,9 +108,6 @@ func tusStartup() {
result := map[int]model.Tus{} result := map[int]model.Tus{}
db.Get().Table("tus").Take(&result) db.Get().Table("tus").Take(&result)
tusQueue := shared.GetTusQueue()
store := shared.GetTusStore()
for _, item := range result { for _, item := range result {
if err := tusQueue.QueueTask(func(ctx context.Context) error { if err := tusQueue.QueueTask(func(ctx context.Context) error {
upload, err := store.GetUpload(nil, item.UploadID) upload, err := store.GetUpload(nil, item.UploadID)
@ -153,7 +150,7 @@ func tusWorker(upload *tusd.Upload) error {
ret = db.Get().Delete(&tusUpload) ret = db.Get().Delete(&tusUpload)
err = shared.GetTusComposer().Terminater.AsTerminatableUpload(*upload).Terminate(context.Background()) err = composer.Terminater.AsTerminatableUpload(*upload).Terminate(context.Background())
if err != nil { if err != nil {
log.Print(err) log.Print(err)