refactor: create a new tus store that uses the db for meta instead of the filesystem
This commit is contained in:
parent
7845f95776
commit
39936b3b14
|
@ -9,4 +9,5 @@ type Tus struct {
|
||||||
ID uint64 `gorm:"primaryKey"`
|
ID uint64 `gorm:"primaryKey"`
|
||||||
UploadID string
|
UploadID string
|
||||||
Hash string
|
Hash string
|
||||||
|
Info string
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,6 +11,7 @@ import (
|
||||||
"git.lumeweb.com/LumeWeb/portal/model"
|
"git.lumeweb.com/LumeWeb/portal/model"
|
||||||
"git.lumeweb.com/LumeWeb/portal/service/files"
|
"git.lumeweb.com/LumeWeb/portal/service/files"
|
||||||
"git.lumeweb.com/LumeWeb/portal/shared"
|
"git.lumeweb.com/LumeWeb/portal/shared"
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/tusstore"
|
||||||
"github.com/golang-queue/queue"
|
"github.com/golang-queue/queue"
|
||||||
tusd "github.com/tus/tusd/pkg/handler"
|
tusd "github.com/tus/tusd/pkg/handler"
|
||||||
"github.com/tus/tusd/pkg/memorylocker"
|
"github.com/tus/tusd/pkg/memorylocker"
|
||||||
|
@ -24,7 +25,7 @@ const TUS_API_PATH = "/files/tus"
|
||||||
const HASH_META_HEADER = "blake3-hash"
|
const HASH_META_HEADER = "blake3-hash"
|
||||||
|
|
||||||
func Init() *tusd.Handler {
|
func Init() *tusd.Handler {
|
||||||
store := &filestore.FileStore{
|
store := &tusstore.DbFileStore{
|
||||||
Path: "/tmp",
|
Path: "/tmp",
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,282 @@
|
||||||
|
package tusstore
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/rand"
|
||||||
|
"encoding/hex"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/db"
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/logger"
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/model"
|
||||||
|
"github.com/tus/tusd/pkg/handler"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
"io"
|
||||||
|
"lukechampine.com/blake3"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
)
|
||||||
|
|
||||||
|
var defaultFilePerm = os.FileMode(0664)
|
||||||
|
|
||||||
|
type DbFileStore struct {
|
||||||
|
// Relative or absolute path to store files in. DbFileStore does not check
|
||||||
|
// whether the path exists, use os.MkdirAll in this case on your own.
|
||||||
|
Path string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (store DbFileStore) UseIn(composer *handler.StoreComposer) {
|
||||||
|
composer.UseCore(store)
|
||||||
|
composer.UseTerminater(store)
|
||||||
|
composer.UseConcater(store)
|
||||||
|
composer.UseLengthDeferrer(store)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (store DbFileStore) NewUpload(ctx context.Context, info handler.FileInfo) (handler.Upload, error) {
|
||||||
|
if info.ID == "" {
|
||||||
|
info.ID = uid()
|
||||||
|
}
|
||||||
|
binPath := store.binPath(info.ID)
|
||||||
|
info.Storage = map[string]string{
|
||||||
|
"Type": "dbstore",
|
||||||
|
"Path": binPath,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create binary file with no content
|
||||||
|
file, err := os.OpenFile(binPath, os.O_CREATE|os.O_WRONLY, defaultFilePerm)
|
||||||
|
if err != nil {
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
err = fmt.Errorf("upload directory does not exist: %s", store.Path)
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
err = file.Close()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
hasher := blake3.New(64, nil)
|
||||||
|
|
||||||
|
_, err = io.Copy(hasher, file)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var hash []byte
|
||||||
|
|
||||||
|
_, err = hasher.XOF().Read(hash)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
upload := &fileUpload{
|
||||||
|
info: info,
|
||||||
|
binPath: binPath,
|
||||||
|
hash: hex.EncodeToString(hash),
|
||||||
|
}
|
||||||
|
|
||||||
|
// writeInfo creates the file by itself if necessary
|
||||||
|
err = upload.writeInfo()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return upload, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (store DbFileStore) GetUpload(ctx context.Context, id string) (handler.Upload, error) {
|
||||||
|
info := handler.FileInfo{}
|
||||||
|
|
||||||
|
fUpload := &fileUpload{
|
||||||
|
info: info,
|
||||||
|
}
|
||||||
|
|
||||||
|
record, is404, err := fUpload.getInfo()
|
||||||
|
if err != nil {
|
||||||
|
if is404 {
|
||||||
|
// Interpret os.ErrNotExist as 404 Not Found
|
||||||
|
err = handler.ErrNotFound
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := json.Unmarshal([]byte(record.Info), &info); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
fUpload.hash = record.Hash
|
||||||
|
binPath := store.binPath(id)
|
||||||
|
stat, err := os.Stat(binPath)
|
||||||
|
if err != nil {
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
// Interpret os.ErrNotExist as 404 Not Found
|
||||||
|
err = handler.ErrNotFound
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
info.Offset = stat.Size()
|
||||||
|
|
||||||
|
fUpload.binPath = binPath
|
||||||
|
|
||||||
|
return fUpload, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (store DbFileStore) AsTerminatableUpload(upload handler.Upload) handler.TerminatableUpload {
|
||||||
|
return upload.(*fileUpload)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (store DbFileStore) AsLengthDeclarableUpload(upload handler.Upload) handler.LengthDeclarableUpload {
|
||||||
|
return upload.(*fileUpload)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (store DbFileStore) AsConcatableUpload(upload handler.Upload) handler.ConcatableUpload {
|
||||||
|
return upload.(*fileUpload)
|
||||||
|
}
|
||||||
|
|
||||||
|
// binPath returns the path to the file storing the binary data.
|
||||||
|
func (store DbFileStore) binPath(id string) string {
|
||||||
|
return filepath.Join(store.Path, id)
|
||||||
|
}
|
||||||
|
|
||||||
|
type fileUpload struct {
|
||||||
|
// info stores the current information about the upload
|
||||||
|
info handler.FileInfo
|
||||||
|
// binPath is the path to the binary file (which has no extension)
|
||||||
|
binPath string
|
||||||
|
hash string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (upload *fileUpload) GetInfo(ctx context.Context) (handler.FileInfo, error) {
|
||||||
|
return upload.info, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (upload *fileUpload) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) {
|
||||||
|
file, err := os.OpenFile(upload.binPath, os.O_WRONLY|os.O_APPEND, defaultFilePerm)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
defer file.Close()
|
||||||
|
|
||||||
|
n, err := io.Copy(file, src)
|
||||||
|
|
||||||
|
upload.info.Offset += n
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (upload *fileUpload) GetReader(ctx context.Context) (io.Reader, error) {
|
||||||
|
return os.Open(upload.binPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (upload *fileUpload) Terminate(ctx context.Context) error {
|
||||||
|
tusUpload := &model.Tus{
|
||||||
|
UploadID: upload.info.ID,
|
||||||
|
}
|
||||||
|
|
||||||
|
ret := db.Get().Where(&tusUpload).Delete(&tusUpload)
|
||||||
|
|
||||||
|
if ret.Error != nil {
|
||||||
|
logger.Get().Error("failed to delete tus entry", zap.Error(ret.Error))
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := os.Remove(upload.binPath); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (upload *fileUpload) ConcatUploads(ctx context.Context, uploads []handler.Upload) (err error) {
|
||||||
|
file, err := os.OpenFile(upload.binPath, os.O_WRONLY|os.O_APPEND, defaultFilePerm)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer file.Close()
|
||||||
|
|
||||||
|
for _, partialUpload := range uploads {
|
||||||
|
fileUpload := partialUpload.(*fileUpload)
|
||||||
|
|
||||||
|
src, err := os.Open(fileUpload.binPath)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := io.Copy(file, src); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (upload *fileUpload) DeclareLength(ctx context.Context, length int64) error {
|
||||||
|
upload.info.Size = length
|
||||||
|
upload.info.SizeIsDeferred = false
|
||||||
|
return upload.writeInfo()
|
||||||
|
}
|
||||||
|
|
||||||
|
// writeInfo updates the entire information. Everything will be overwritten.
|
||||||
|
func (upload *fileUpload) writeInfo() error {
|
||||||
|
data, err := json.Marshal(upload.info)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
tusRecord, is404, err := upload.getInfo()
|
||||||
|
|
||||||
|
if err != nil && !is404 {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if tusRecord != nil {
|
||||||
|
tusRecord.Info = string(data)
|
||||||
|
if ret := db.Get().Update("info", &tusRecord); ret.Error != nil {
|
||||||
|
logger.Get().Error("failed to update tus entry", zap.Error(ret.Error))
|
||||||
|
|
||||||
|
return ret.Error
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tusRecord = &model.Tus{UploadID: upload.info.ID, Hash: upload.hash, Info: string(data)}
|
||||||
|
|
||||||
|
if ret := db.Get().Create(&tusRecord); ret.Error != nil {
|
||||||
|
logger.Get().Error("failed to create tus entry", zap.Error(ret.Error))
|
||||||
|
|
||||||
|
return ret.Error
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (upload *fileUpload) getInfo() (*model.Tus, bool, error) {
|
||||||
|
var tusRecord model.Tus
|
||||||
|
|
||||||
|
result := db.Get().Where(&model.Tus{UploadID: upload.info.ID}).First(&tusRecord)
|
||||||
|
|
||||||
|
if result.Error != nil && result.Error.Error() != "record not found" {
|
||||||
|
logger.Get().Error("failed to query tus entry", zap.Error(result.Error))
|
||||||
|
return nil, false, result.Error
|
||||||
|
}
|
||||||
|
|
||||||
|
if result.Error != nil {
|
||||||
|
return nil, true, result.Error
|
||||||
|
}
|
||||||
|
|
||||||
|
return &tusRecord, false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (upload *fileUpload) FinishUpload(ctx context.Context) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func uid() string {
|
||||||
|
id := make([]byte, 16)
|
||||||
|
_, err := io.ReadFull(rand.Reader, id)
|
||||||
|
if err != nil {
|
||||||
|
// This is probably an appropriate way to handle errors from our source
|
||||||
|
// for random bits.
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return hex.EncodeToString(id)
|
||||||
|
}
|
Loading…
Reference in New Issue