refactor: make PutFileSmall handle all tasks
This commit is contained in:
parent
c084743b47
commit
fc53bd3083
|
@ -6,10 +6,16 @@ import (
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/api/middleware"
|
||||||
|
|
||||||
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
|
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
|
||||||
"git.lumeweb.com/LumeWeb/libs5-go/types"
|
"git.lumeweb.com/LumeWeb/libs5-go/types"
|
||||||
"git.lumeweb.com/LumeWeb/portal/account"
|
"git.lumeweb.com/LumeWeb/portal/account"
|
||||||
"git.lumeweb.com/LumeWeb/portal/api/middleware"
|
|
||||||
"git.lumeweb.com/LumeWeb/portal/bao"
|
"git.lumeweb.com/LumeWeb/portal/bao"
|
||||||
"git.lumeweb.com/LumeWeb/portal/cron"
|
"git.lumeweb.com/LumeWeb/portal/cron"
|
||||||
"git.lumeweb.com/LumeWeb/portal/db/models"
|
"git.lumeweb.com/LumeWeb/portal/db/models"
|
||||||
|
@ -27,10 +33,6 @@ import (
|
||||||
"go.uber.org/fx"
|
"go.uber.org/fx"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"gorm.io/gorm"
|
"gorm.io/gorm"
|
||||||
"io"
|
|
||||||
"net/http"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type TusPreUploadCreateCallback func(hook tusd.HookEvent) (tusd.HTTPResponse, tusd.FileInfoChanges, error)
|
type TusPreUploadCreateCallback func(hook tusd.HookEvent) (tusd.HTTPResponse, tusd.FileInfoChanges, error)
|
||||||
|
@ -95,9 +97,24 @@ func NewStorageService(lc fx.Lifecycle, params StorageServiceParams) *StorageSer
|
||||||
return ss
|
return ss
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s StorageServiceDefault) PutFileSmall(file io.ReadSeeker, bucket string) (*bao.Result, error) {
|
func (s StorageServiceDefault) PutFileSmall(file io.ReadSeeker, bucket string, userId uint, userIp string) (*models.Upload, error) {
|
||||||
hash, err := s.GetHashSmall(file)
|
hashResult, len, err := s.GetHashSmall(file)
|
||||||
hashStr, err := encoding.NewMultihash(s.getPrefixedHash(hash.Hash)).ToBase64Url()
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
exists, upload := s.FileExists(hashResult.Hash)
|
||||||
|
if exists {
|
||||||
|
return upload, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Re-seek the file to the beginning after hashing
|
||||||
|
_, err = file.Seek(0, io.SeekStart)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
hashStr, err := encoding.NewMultihash(s.getPrefixedHash(hashResult.Hash)).ToBase64Url()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -107,6 +124,18 @@ func (s StorageServiceDefault) PutFileSmall(file io.ReadSeeker, bucket string) (
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
raw, err := io.ReadAll(file)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = file.Seek(0, io.SeekStart)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
mimeType := http.DetectContentType(raw)
|
||||||
|
|
||||||
err = s.renter.CreateBucketIfNotExists(bucket)
|
err = s.renter.CreateBucketIfNotExists(bucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -118,13 +147,18 @@ func (s StorageServiceDefault) PutFileSmall(file io.ReadSeeker, bucket string) (
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = s.renter.UploadObject(context.Background(), bytes.NewReader(hash.Proof), bucket, fmt.Sprintf("%s.bao", hashStr))
|
err = s.renter.UploadObject(context.Background(), bytes.NewReader(hashResult.Proof), bucket, fmt.Sprintf("%s.bao", hashStr))
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return hash, nil
|
upload, err = s.CreateUpload(hashResult.Hash, mimeType, userId, userIp, uint64(len), bucket)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return upload, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *StorageServiceDefault) BuildUploadBufferTus(basePath string, preUploadCb TusPreUploadCreateCallback, preFinishCb TusPreFinishResponseCallback) (*tusd.Handler, tusd.DataStore, *s3.Client, error) {
|
func (s *StorageServiceDefault) BuildUploadBufferTus(basePath string, preUploadCb TusPreUploadCreateCallback, preFinishCb TusPreFinishResponseCallback) (*tusd.Handler, tusd.DataStore, *s3.Client, error) {
|
||||||
|
@ -224,29 +258,29 @@ func (s *StorageServiceDefault) LoadInitialTasks(cron cron.CronService) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *StorageServiceDefault) FileExists(hash []byte) (bool, models.Upload) {
|
func (s *StorageServiceDefault) FileExists(hash []byte) (bool, *models.Upload) {
|
||||||
hashStr := hex.EncodeToString(hash)
|
hashStr := hex.EncodeToString(hash)
|
||||||
|
|
||||||
var upload models.Upload
|
var upload models.Upload
|
||||||
result := s.db.Model(&models.Upload{}).Where(&models.Upload{Hash: hashStr}).First(&upload)
|
result := s.db.Model(&models.Upload{}).Where(&models.Upload{Hash: hashStr}).First(&upload)
|
||||||
|
|
||||||
return result.RowsAffected > 0, upload
|
return result.RowsAffected > 0, &upload
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *StorageServiceDefault) GetHashSmall(file io.ReadSeeker) (*bao.Result, error) {
|
func (s *StorageServiceDefault) GetHashSmall(file io.ReadSeeker) (*bao.Result, int, error) {
|
||||||
buf := bytes.NewBuffer(nil)
|
buf := bytes.NewBuffer(nil)
|
||||||
|
|
||||||
_, err := io.Copy(buf, file)
|
_, err := io.Copy(buf, file)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
result, _, err := bao.Hash(buf)
|
result, _, err := bao.Hash(buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return result, nil
|
return result, buf.Len(), nil
|
||||||
}
|
}
|
||||||
func (s *StorageServiceDefault) GetHash(file io.Reader) (*bao.Result, int, error) {
|
func (s *StorageServiceDefault) GetHash(file io.Reader) (*bao.Result, int, error) {
|
||||||
hash, totalBytes, err := bao.Hash(file)
|
hash, totalBytes, err := bao.Hash(file)
|
||||||
|
@ -290,7 +324,7 @@ func (s *StorageServiceDefault) tusWorker() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
uploaderID, ok := info.Context.Value(middleware.S5AuthUserIDKey).(uint64)
|
uploaderID, ok := info.Context.Value(middleware.DEFAULT_AUTH_CONTEXT_KEY).(uint64)
|
||||||
if !ok {
|
if !ok {
|
||||||
errorResponse.Body = "Missing user id in context"
|
errorResponse.Body = "Missing user id in context"
|
||||||
info.Upload.StopUpload(errorResponse)
|
info.Upload.StopUpload(errorResponse)
|
||||||
|
|
Loading…
Reference in New Issue