2023-05-16 22:45:32 +00:00
|
|
|
package tus
|
2023-05-15 16:36:00 +00:00
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"encoding/hex"
|
|
|
|
"encoding/json"
|
|
|
|
"errors"
|
|
|
|
"git.lumeweb.com/LumeWeb/portal/cid"
|
|
|
|
"git.lumeweb.com/LumeWeb/portal/db"
|
2023-05-22 23:05:38 +00:00
|
|
|
"git.lumeweb.com/LumeWeb/portal/logger"
|
2023-05-15 16:36:00 +00:00
|
|
|
"git.lumeweb.com/LumeWeb/portal/model"
|
|
|
|
"git.lumeweb.com/LumeWeb/portal/service/files"
|
2023-05-16 22:45:32 +00:00
|
|
|
"git.lumeweb.com/LumeWeb/portal/shared"
|
2023-05-22 23:07:06 +00:00
|
|
|
"git.lumeweb.com/LumeWeb/portal/tusstore"
|
2023-05-15 16:36:00 +00:00
|
|
|
"github.com/golang-queue/queue"
|
|
|
|
tusd "github.com/tus/tusd/pkg/handler"
|
|
|
|
"github.com/tus/tusd/pkg/memorylocker"
|
2023-05-19 13:04:47 +00:00
|
|
|
"go.uber.org/zap"
|
2023-06-06 21:25:29 +00:00
|
|
|
"golang.org/x/exp/slices"
|
2023-06-09 11:57:06 +00:00
|
|
|
"gorm.io/gorm"
|
2023-05-17 13:52:25 +00:00
|
|
|
"io"
|
2023-05-15 16:36:00 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
const TUS_API_PATH = "/files/tus"
|
|
|
|
|
2023-06-10 05:59:56 +00:00
|
|
|
const HASH_META_HEADER = "hash"
|
2023-05-15 16:36:00 +00:00
|
|
|
|
2023-05-16 22:45:32 +00:00
|
|
|
func Init() *tusd.Handler {
|
2023-05-22 23:07:06 +00:00
|
|
|
store := &tusstore.DbFileStore{
|
2023-05-15 16:36:00 +00:00
|
|
|
Path: "/tmp",
|
|
|
|
}
|
|
|
|
|
2023-05-16 22:45:32 +00:00
|
|
|
shared.SetTusStore(store)
|
|
|
|
|
2023-05-15 16:36:00 +00:00
|
|
|
composer := tusd.NewStoreComposer()
|
|
|
|
composer.UseCore(store)
|
|
|
|
composer.UseConcater(store)
|
|
|
|
composer.UseLocker(memorylocker.New())
|
|
|
|
composer.UseTerminater(store)
|
2023-05-17 17:34:05 +00:00
|
|
|
shared.SetTusComposer(composer)
|
2023-05-15 16:36:00 +00:00
|
|
|
|
|
|
|
handler, err := tusd.NewHandler(tusd.Config{
|
|
|
|
BasePath: "/api/v1" + TUS_API_PATH,
|
|
|
|
StoreComposer: composer,
|
|
|
|
PreUploadCreateCallback: func(hook tusd.HookEvent) error {
|
|
|
|
hash := hook.Upload.MetaData[HASH_META_HEADER]
|
|
|
|
|
|
|
|
if len(hash) == 0 {
|
2023-06-10 05:59:56 +00:00
|
|
|
msg := "missing hash metadata"
|
2023-05-22 23:05:38 +00:00
|
|
|
logger.Get().Debug(msg)
|
2023-05-19 13:04:47 +00:00
|
|
|
return errors.New(msg)
|
2023-05-15 16:36:00 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
var upload model.Upload
|
|
|
|
result := db.Get().Where(&model.Upload{Hash: hash}).First(&upload)
|
2023-06-09 11:57:06 +00:00
|
|
|
if (result.Error != nil && !errors.Is(result.Error, gorm.ErrRecordNotFound)) || result.RowsAffected > 0 {
|
2023-05-15 16:36:00 +00:00
|
|
|
hashBytes, err := hex.DecodeString(hash)
|
|
|
|
if err != nil {
|
2023-05-22 23:05:38 +00:00
|
|
|
logger.Get().Debug("invalid hash", zap.Error(err))
|
2023-05-15 16:36:00 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
cidString, err := cid.Encode(hashBytes, uint64(hook.Upload.Size))
|
|
|
|
|
|
|
|
if err != nil {
|
2023-05-22 23:05:38 +00:00
|
|
|
logger.Get().Debug("failed to create cid", zap.Error(err))
|
2023-05-15 16:36:00 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
resp, err := json.Marshal(UploadResponse{Cid: cidString})
|
|
|
|
|
|
|
|
if err != nil {
|
2023-05-22 23:05:38 +00:00
|
|
|
logger.Get().Error("failed to create response", zap.Error(err))
|
2023-05-15 16:36:00 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return tusd.NewHTTPError(errors.New(string(resp)), 304)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
},
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
2023-05-16 22:45:32 +00:00
|
|
|
|
2023-05-24 00:12:48 +00:00
|
|
|
pool := queue.NewPool(5)
|
|
|
|
|
|
|
|
shared.SetTusQueue(pool)
|
|
|
|
shared.SetTusWorker(tusWorker)
|
2023-05-15 16:36:00 +00:00
|
|
|
|
|
|
|
go tusStartup()
|
|
|
|
|
|
|
|
return handler
|
|
|
|
}
|
|
|
|
|
|
|
|
func tusStartup() {
|
2023-05-24 00:12:48 +00:00
|
|
|
tusQueue := getQueue()
|
|
|
|
store := getStore()
|
2023-05-16 22:45:32 +00:00
|
|
|
|
2023-06-06 21:01:54 +00:00
|
|
|
rows, err := db.Get().Model(&model.Tus{}).Rows()
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
logger.Get().Error("failed to query tus uploads", zap.Error(err))
|
|
|
|
}
|
|
|
|
|
|
|
|
defer rows.Close()
|
|
|
|
|
2023-06-06 21:25:29 +00:00
|
|
|
processedHashes := make([]string, 0)
|
|
|
|
|
2023-06-06 21:01:54 +00:00
|
|
|
for rows.Next() {
|
|
|
|
var tusUpload model.Tus
|
|
|
|
err := db.Get().ScanRows(rows, &tusUpload)
|
|
|
|
if err != nil {
|
|
|
|
logger.Get().Error("failed to scan tus records", zap.Error(err))
|
|
|
|
return
|
|
|
|
}
|
2023-06-06 21:25:29 +00:00
|
|
|
|
|
|
|
upload, err := store.GetUpload(nil, tusUpload.UploadID)
|
|
|
|
if err != nil {
|
|
|
|
logger.Get().Error("failed to query tus upload", zap.Error(err))
|
|
|
|
db.Get().Delete(&tusUpload)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
if slices.Contains(processedHashes, tusUpload.Hash) {
|
|
|
|
err := terminateUpload(upload)
|
2023-05-15 16:36:00 +00:00
|
|
|
if err != nil {
|
2023-06-06 21:25:29 +00:00
|
|
|
logger.Get().Error("failed to terminate tus upload", zap.Error(err))
|
2023-05-15 16:36:00 +00:00
|
|
|
}
|
2023-06-06 21:25:29 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := tusQueue.QueueTask(func(ctx context.Context) error {
|
2023-05-15 16:36:00 +00:00
|
|
|
return tusWorker(&upload)
|
|
|
|
}); err != nil {
|
2023-06-06 21:25:29 +00:00
|
|
|
logger.Get().Error("failed to queue tus upload", zap.Error(err))
|
|
|
|
} else {
|
|
|
|
processedHashes = append(processedHashes, tusUpload.Hash)
|
2023-05-15 16:36:00 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func tusWorker(upload *tusd.Upload) error {
|
2023-05-19 13:04:47 +00:00
|
|
|
info, err := (*upload).GetInfo(context.Background())
|
2023-05-15 16:36:00 +00:00
|
|
|
if err != nil {
|
2023-05-22 23:05:38 +00:00
|
|
|
logger.Get().Error("failed to query tus upload metadata", zap.Error(err))
|
2023-05-15 16:36:00 +00:00
|
|
|
return err
|
|
|
|
}
|
2023-05-19 13:04:47 +00:00
|
|
|
file, err := (*upload).GetReader(context.Background())
|
2023-05-15 16:36:00 +00:00
|
|
|
if err != nil {
|
2023-05-22 23:05:38 +00:00
|
|
|
logger.Get().Error("failed reading upload", zap.Error(err))
|
2023-05-15 16:36:00 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2023-05-22 15:00:24 +00:00
|
|
|
hashHex := info.MetaData[HASH_META_HEADER]
|
|
|
|
|
|
|
|
hashBytes, err := hex.DecodeString(hashHex)
|
|
|
|
|
|
|
|
if err != nil {
|
2023-05-22 23:05:38 +00:00
|
|
|
logger.Get().Error("failed decoding hash", zap.Error(err))
|
2023-05-22 15:00:24 +00:00
|
|
|
tErr := terminateUpload(*upload)
|
|
|
|
|
|
|
|
if tErr != nil {
|
|
|
|
return tErr
|
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2023-06-09 11:04:52 +00:00
|
|
|
newUpload, err := files.Upload(file.(io.ReadSeeker), info.Size, hashBytes)
|
2023-05-22 14:26:48 +00:00
|
|
|
tErr := terminateUpload(*upload)
|
2023-05-17 17:34:27 +00:00
|
|
|
|
2023-05-22 14:26:48 +00:00
|
|
|
if tErr != nil {
|
|
|
|
return tErr
|
2023-05-15 16:36:00 +00:00
|
|
|
}
|
|
|
|
|
2023-05-17 17:34:27 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2023-06-09 11:04:52 +00:00
|
|
|
err = files.Pin(newUpload.Hash, newUpload.AccountID)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2023-05-17 17:34:27 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func terminateUpload(upload tusd.Upload) error {
|
2023-05-24 00:12:48 +00:00
|
|
|
err := getComposer().Terminater.AsTerminatableUpload(upload).Terminate(context.Background())
|
2023-05-15 16:36:00 +00:00
|
|
|
|
|
|
|
if err != nil {
|
2023-05-22 23:05:38 +00:00
|
|
|
logger.Get().Error("failed deleting tus upload", zap.Error(err))
|
2023-05-22 14:26:48 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if err != nil {
|
2023-05-15 16:36:00 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
type UploadResponse struct {
|
|
|
|
Cid string `json:"cid"`
|
|
|
|
}
|
2023-05-24 00:12:48 +00:00
|
|
|
|
|
|
|
func getQueue() *queue.Queue {
|
|
|
|
ret := shared.GetTusQueue()
|
|
|
|
return (*ret).(*queue.Queue)
|
|
|
|
}
|
|
|
|
|
|
|
|
func getStore() *tusstore.DbFileStore {
|
|
|
|
ret := shared.GetTusStore()
|
|
|
|
return (*ret).(*tusstore.DbFileStore)
|
|
|
|
}
|
|
|
|
func getComposer() *tusd.StoreComposer {
|
|
|
|
ret := shared.GetTusComposer()
|
|
|
|
return (*ret).(*tusd.StoreComposer)
|
|
|
|
}
|