Compare commits
4 Commits
673f7c6dfd
...
2f514c02be
Author | SHA1 | Date |
---|---|---|
Derrick Hammer | 2f514c02be | |
Derrick Hammer | 503cb55c55 | |
Derrick Hammer | 55d8dda6e8 | |
Derrick Hammer | 4548de5c60 |
7
main.go
7
main.go
|
@ -8,6 +8,7 @@ import (
|
||||||
_ "git.lumeweb.com/LumeWeb/portal/docs"
|
_ "git.lumeweb.com/LumeWeb/portal/docs"
|
||||||
"git.lumeweb.com/LumeWeb/portal/renterd"
|
"git.lumeweb.com/LumeWeb/portal/renterd"
|
||||||
"git.lumeweb.com/LumeWeb/portal/service/files"
|
"git.lumeweb.com/LumeWeb/portal/service/files"
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/tus"
|
||||||
"git.lumeweb.com/LumeWeb/portal/validator"
|
"git.lumeweb.com/LumeWeb/portal/validator"
|
||||||
"github.com/iris-contrib/swagger"
|
"github.com/iris-contrib/swagger"
|
||||||
"github.com/iris-contrib/swagger/swaggerFiles"
|
"github.com/iris-contrib/swagger/swaggerFiles"
|
||||||
|
@ -76,10 +77,10 @@ func main() {
|
||||||
app.Handle(new(controller.FilesController))
|
app.Handle(new(controller.FilesController))
|
||||||
})
|
})
|
||||||
|
|
||||||
tus := initTus()
|
tusHandler := tus.Init()
|
||||||
|
|
||||||
v1.Any(TUS_API_PATH+"/{fileparam:path}", iris.FromStd(http.StripPrefix(v1.GetRelPath()+TUS_API_PATH+"/", tus)))
|
v1.Any(tus.TUS_API_PATH+"/{fileparam:path}", iris.FromStd(http.StripPrefix(v1.GetRelPath()+tus.TUS_API_PATH+"/", tusHandler)))
|
||||||
v1.Post(TUS_API_PATH, iris.FromStd(http.StripPrefix(v1.GetRelPath()+TUS_API_PATH, tus)))
|
v1.Post(tus.TUS_API_PATH, iris.FromStd(http.StripPrefix(v1.GetRelPath()+tus.TUS_API_PATH, tusHandler)))
|
||||||
|
|
||||||
swaggerConfig := swagger.Config{
|
swaggerConfig := swagger.Config{
|
||||||
// The url pointing to API definition.
|
// The url pointing to API definition.
|
||||||
|
|
|
@ -7,6 +7,6 @@ import (
|
||||||
type Tus struct {
|
type Tus struct {
|
||||||
gorm.Model
|
gorm.Model
|
||||||
UploadID string `gorm:"primaryKey"`
|
UploadID string `gorm:"primaryKey"`
|
||||||
Path string
|
Id string
|
||||||
Hash string
|
Hash string
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,7 @@ package files
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
|
"context"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
@ -9,6 +10,7 @@ import (
|
||||||
"git.lumeweb.com/LumeWeb/portal/db"
|
"git.lumeweb.com/LumeWeb/portal/db"
|
||||||
"git.lumeweb.com/LumeWeb/portal/model"
|
"git.lumeweb.com/LumeWeb/portal/model"
|
||||||
"git.lumeweb.com/LumeWeb/portal/renterd"
|
"git.lumeweb.com/LumeWeb/portal/renterd"
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/shared"
|
||||||
"github.com/go-resty/resty/v2"
|
"github.com/go-resty/resty/v2"
|
||||||
"io"
|
"io"
|
||||||
"lukechampine.com/blake3"
|
"lukechampine.com/blake3"
|
||||||
|
@ -134,16 +136,35 @@ func Upload(r io.ReadSeeker, file *os.File) (model.Upload, error) {
|
||||||
return upload, nil
|
return upload, nil
|
||||||
}
|
}
|
||||||
func Download(hash string) (io.Reader, error) {
|
func Download(hash string) (io.Reader, error) {
|
||||||
result := db.Get().Table("uploads").Where(&model.Upload{Hash: hash}).Row()
|
uploadItem := db.Get().Table("uploads").Where(&model.Upload{Hash: hash}).Row()
|
||||||
|
tusItem := db.Get().Table("tus").Where(&model.Tus{Hash: hash}).Row()
|
||||||
|
|
||||||
if result.Err() != nil {
|
if uploadItem.Err() == nil {
|
||||||
return nil, result.Err()
|
fetch, err := client.R().SetDoNotParseResponse(true).Get(fmt.Sprintf("/worker/objects/%s", hash))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return fetch.RawBody(), nil
|
||||||
|
} else if tusItem.Err() == nil {
|
||||||
|
var tusData model.Tus
|
||||||
|
err := tusItem.Scan(&tusData)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
upload, err := shared.GetTusStore().GetUpload(context.Background(), tusData.Id)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
reader, err := upload.GetReader(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return reader, nil
|
||||||
|
} else {
|
||||||
|
return nil, errors.New("invalid file")
|
||||||
}
|
}
|
||||||
|
|
||||||
fetch, err := client.R().SetDoNotParseResponse(true).Get(fmt.Sprintf("/worker/objects/%s", hash))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return fetch.RawBody(), nil
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,35 @@
|
||||||
|
package shared
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/golang-queue/queue"
|
||||||
|
"github.com/tus/tusd/pkg/filestore"
|
||||||
|
tusd "github.com/tus/tusd/pkg/handler"
|
||||||
|
)
|
||||||
|
|
||||||
|
var tusQueue *queue.Queue
|
||||||
|
var tusStore *filestore.FileStore
|
||||||
|
var tusComposer *tusd.StoreComposer
|
||||||
|
|
||||||
|
func SetTusQueue(q *queue.Queue) {
|
||||||
|
tusQueue = q
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetTusQueue() *queue.Queue {
|
||||||
|
return tusQueue
|
||||||
|
}
|
||||||
|
|
||||||
|
func SetTusStore(s *filestore.FileStore) {
|
||||||
|
tusStore = s
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetTusStore() *filestore.FileStore {
|
||||||
|
return tusStore
|
||||||
|
}
|
||||||
|
|
||||||
|
func SetTusComposer(c *tusd.StoreComposer) {
|
||||||
|
tusComposer = c
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetTusComposer() *tusd.StoreComposer {
|
||||||
|
return tusComposer
|
||||||
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
package main
|
package tus
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
@ -9,6 +9,7 @@ import (
|
||||||
"git.lumeweb.com/LumeWeb/portal/db"
|
"git.lumeweb.com/LumeWeb/portal/db"
|
||||||
"git.lumeweb.com/LumeWeb/portal/model"
|
"git.lumeweb.com/LumeWeb/portal/model"
|
||||||
"git.lumeweb.com/LumeWeb/portal/service/files"
|
"git.lumeweb.com/LumeWeb/portal/service/files"
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/shared"
|
||||||
"github.com/golang-queue/queue"
|
"github.com/golang-queue/queue"
|
||||||
"github.com/tus/tusd/pkg/filestore"
|
"github.com/tus/tusd/pkg/filestore"
|
||||||
tusd "github.com/tus/tusd/pkg/handler"
|
tusd "github.com/tus/tusd/pkg/handler"
|
||||||
|
@ -21,15 +22,13 @@ const TUS_API_PATH = "/files/tus"
|
||||||
|
|
||||||
const HASH_META_HEADER = "blake3-hash"
|
const HASH_META_HEADER = "blake3-hash"
|
||||||
|
|
||||||
var tusQueue *queue.Queue
|
func Init() *tusd.Handler {
|
||||||
var store *filestore.FileStore
|
store := &filestore.FileStore{
|
||||||
var composer *tusd.StoreComposer
|
|
||||||
|
|
||||||
func initTus() *tusd.Handler {
|
|
||||||
store = &filestore.FileStore{
|
|
||||||
Path: "/tmp",
|
Path: "/tmp",
|
||||||
}
|
}
|
||||||
|
|
||||||
|
shared.SetTusStore(store)
|
||||||
|
|
||||||
composer := tusd.NewStoreComposer()
|
composer := tusd.NewStoreComposer()
|
||||||
composer.UseCore(store)
|
composer.UseCore(store)
|
||||||
composer.UseConcater(store)
|
composer.UseConcater(store)
|
||||||
|
@ -73,7 +72,7 @@ func initTus() *tusd.Handler {
|
||||||
},
|
},
|
||||||
PreFinishResponseCallback: func(hook tusd.HookEvent) error {
|
PreFinishResponseCallback: func(hook tusd.HookEvent) error {
|
||||||
tusEntry := &model.Tus{
|
tusEntry := &model.Tus{
|
||||||
Path: hook.Upload.Storage["Path"],
|
Id: hook.Upload.ID,
|
||||||
Hash: hook.Upload.MetaData[HASH_META_HEADER],
|
Hash: hook.Upload.MetaData[HASH_META_HEADER],
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -81,7 +80,7 @@ func initTus() *tusd.Handler {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := tusQueue.QueueTask(func(ctx context.Context) error {
|
if err := shared.GetTusQueue().QueueTask(func(ctx context.Context) error {
|
||||||
upload, err := store.GetUpload(nil, hook.Upload.ID)
|
upload, err := store.GetUpload(nil, hook.Upload.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -97,7 +96,8 @@ func initTus() *tusd.Handler {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
tusQueue = queue.NewPool(5)
|
|
||||||
|
shared.SetTusQueue(queue.NewPool(5))
|
||||||
|
|
||||||
go tusStartup()
|
go tusStartup()
|
||||||
|
|
||||||
|
@ -108,6 +108,9 @@ func tusStartup() {
|
||||||
result := map[int]model.Tus{}
|
result := map[int]model.Tus{}
|
||||||
db.Get().Table("tus").Take(&result)
|
db.Get().Table("tus").Take(&result)
|
||||||
|
|
||||||
|
tusQueue := shared.GetTusQueue()
|
||||||
|
store := shared.GetTusStore()
|
||||||
|
|
||||||
for _, item := range result {
|
for _, item := range result {
|
||||||
if err := tusQueue.QueueTask(func(ctx context.Context) error {
|
if err := tusQueue.QueueTask(func(ctx context.Context) error {
|
||||||
upload, err := store.GetUpload(nil, item.UploadID)
|
upload, err := store.GetUpload(nil, item.UploadID)
|
||||||
|
@ -150,7 +153,7 @@ func tusWorker(upload *tusd.Upload) error {
|
||||||
|
|
||||||
ret = db.Get().Delete(&tusUpload)
|
ret = db.Get().Delete(&tusUpload)
|
||||||
|
|
||||||
err = composer.Terminater.AsTerminatableUpload(*upload).Terminate(context.Background())
|
err = shared.GetTusComposer().Terminater.AsTerminatableUpload(*upload).Terminate(context.Background())
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Print(err)
|
log.Print(err)
|
Loading…
Reference in New Issue