2024-01-15 04:52:54 +00:00
|
|
|
package storage
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
2024-01-19 20:51:31 +00:00
|
|
|
"context"
|
2024-01-16 05:58:51 +00:00
|
|
|
"encoding/hex"
|
2024-01-16 06:36:19 +00:00
|
|
|
"errors"
|
2024-01-25 14:50:17 +00:00
|
|
|
"fmt"
|
2024-02-16 01:55:21 +00:00
|
|
|
"io"
|
|
|
|
"net/http"
|
|
|
|
"strings"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"git.lumeweb.com/LumeWeb/portal/api/middleware"
|
|
|
|
|
2024-01-15 04:52:54 +00:00
|
|
|
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
|
2024-01-19 20:51:31 +00:00
|
|
|
"git.lumeweb.com/LumeWeb/libs5-go/types"
|
2024-01-28 07:20:59 +00:00
|
|
|
"git.lumeweb.com/LumeWeb/portal/account"
|
2024-02-09 20:23:33 +00:00
|
|
|
"git.lumeweb.com/LumeWeb/portal/bao"
|
2024-01-28 07:20:59 +00:00
|
|
|
"git.lumeweb.com/LumeWeb/portal/cron"
|
2024-01-16 05:40:50 +00:00
|
|
|
"git.lumeweb.com/LumeWeb/portal/db/models"
|
2024-02-01 02:27:38 +00:00
|
|
|
"git.lumeweb.com/LumeWeb/portal/renter"
|
2024-01-19 20:51:31 +00:00
|
|
|
"github.com/aws/aws-sdk-go-v2/aws"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/config"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/credentials"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/service/s3"
|
2024-01-23 00:06:28 +00:00
|
|
|
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
|
2024-01-19 20:51:31 +00:00
|
|
|
"github.com/google/uuid"
|
2024-01-28 07:20:59 +00:00
|
|
|
"github.com/spf13/viper"
|
2024-01-19 20:51:31 +00:00
|
|
|
tusd "github.com/tus/tusd/v2/pkg/handler"
|
2024-01-28 07:20:59 +00:00
|
|
|
"github.com/tus/tusd/v2/pkg/s3store"
|
2024-01-25 13:37:15 +00:00
|
|
|
"go.sia.tech/renterd/api"
|
2024-01-28 07:20:59 +00:00
|
|
|
"go.uber.org/fx"
|
2024-01-19 20:51:31 +00:00
|
|
|
"go.uber.org/zap"
|
2024-01-28 07:20:59 +00:00
|
|
|
"gorm.io/gorm"
|
2024-01-15 04:52:54 +00:00
|
|
|
)
|
|
|
|
|
2024-01-28 07:20:59 +00:00
|
|
|
type TusPreUploadCreateCallback func(hook tusd.HookEvent) (tusd.HTTPResponse, tusd.FileInfoChanges, error)
|
|
|
|
type TusPreFinishResponseCallback func(hook tusd.HookEvent) (tusd.HTTPResponse, error)
|
|
|
|
|
|
|
|
type StorageServiceParams struct {
|
|
|
|
fx.In
|
|
|
|
Config *viper.Viper
|
|
|
|
Logger *zap.Logger
|
|
|
|
Db *gorm.DB
|
2024-02-01 01:29:27 +00:00
|
|
|
Accounts *account.AccountServiceDefault
|
|
|
|
Cron *cron.CronServiceDefault
|
2024-02-01 23:12:09 +00:00
|
|
|
Renter *renter.RenterDefault
|
2024-01-28 07:20:59 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
var Module = fx.Module("storage",
|
|
|
|
fx.Provide(
|
|
|
|
NewStorageService,
|
|
|
|
),
|
2024-02-01 02:28:02 +00:00
|
|
|
fx.Invoke(func(s *StorageServiceDefault) error {
|
|
|
|
return s.init()
|
|
|
|
}),
|
2024-01-15 04:52:54 +00:00
|
|
|
)
|
|
|
|
|
2024-02-01 01:29:27 +00:00
|
|
|
type StorageServiceDefault struct {
|
2024-02-01 02:27:38 +00:00
|
|
|
tus *tusd.Handler
|
|
|
|
tusStore tusd.DataStore
|
|
|
|
s3Client *s3.Client
|
|
|
|
config *viper.Viper
|
|
|
|
logger *zap.Logger
|
|
|
|
db *gorm.DB
|
|
|
|
accounts *account.AccountServiceDefault
|
|
|
|
cron *cron.CronServiceDefault
|
|
|
|
renter *renter.RenterDefault
|
2024-01-19 20:51:31 +00:00
|
|
|
}
|
|
|
|
|
2024-02-01 01:29:27 +00:00
|
|
|
func (s *StorageServiceDefault) Tus() *tusd.Handler {
|
2024-01-19 21:51:41 +00:00
|
|
|
return s.tus
|
|
|
|
}
|
|
|
|
|
2024-02-01 01:29:27 +00:00
|
|
|
func (s *StorageServiceDefault) Start() error {
|
2024-01-19 20:51:31 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2024-02-01 02:28:31 +00:00
|
|
|
func NewStorageService(lc fx.Lifecycle, params StorageServiceParams) *StorageServiceDefault {
|
|
|
|
ss := &StorageServiceDefault{
|
2024-01-28 07:20:59 +00:00
|
|
|
config: params.Config,
|
|
|
|
logger: params.Logger,
|
|
|
|
db: params.Db,
|
|
|
|
accounts: params.Accounts,
|
|
|
|
cron: params.Cron,
|
2024-02-01 23:12:09 +00:00
|
|
|
renter: params.Renter,
|
2024-01-15 04:52:54 +00:00
|
|
|
}
|
2024-02-01 02:28:31 +00:00
|
|
|
|
|
|
|
lc.Append(fx.Hook{
|
|
|
|
OnStart: func(ctx context.Context) error {
|
|
|
|
go ss.tusWorker()
|
|
|
|
return nil
|
|
|
|
},
|
|
|
|
})
|
|
|
|
|
|
|
|
return ss
|
2024-01-15 04:52:54 +00:00
|
|
|
}
|
|
|
|
|
2024-02-16 01:55:21 +00:00
|
|
|
func (s StorageServiceDefault) PutFileSmall(file io.ReadSeeker, bucket string, userId uint, userIp string) (*models.Upload, error) {
|
|
|
|
hashResult, len, err := s.GetHashSmall(file)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
exists, upload := s.FileExists(hashResult.Hash)
|
|
|
|
if exists {
|
|
|
|
return upload, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Re-seek the file to the beginning after hashing
|
|
|
|
_, err = file.Seek(0, io.SeekStart)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
hashStr, err := encoding.NewMultihash(s.getPrefixedHash(hashResult.Hash)).ToBase64Url()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
_, err = file.Seek(0, io.SeekStart)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
raw, err := io.ReadAll(file)
|
2024-01-15 04:52:54 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2024-01-16 05:48:06 +00:00
|
|
|
_, err = file.Seek(0, io.SeekStart)
|
2024-01-15 04:52:54 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2024-02-16 01:55:21 +00:00
|
|
|
mimeType := http.DetectContentType(raw)
|
|
|
|
|
2024-02-01 02:27:38 +00:00
|
|
|
err = s.renter.CreateBucketIfNotExists(bucket)
|
2024-01-15 19:25:12 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2024-02-01 02:27:38 +00:00
|
|
|
err = s.renter.UploadObject(context.Background(), file, bucket, hashStr)
|
2024-01-25 13:37:15 +00:00
|
|
|
|
2024-01-15 04:52:54 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2024-02-16 01:55:21 +00:00
|
|
|
err = s.renter.UploadObject(context.Background(), bytes.NewReader(hashResult.Proof), bucket, fmt.Sprintf("%s.bao", hashStr))
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2024-01-19 20:51:31 +00:00
|
|
|
|
2024-02-16 01:55:21 +00:00
|
|
|
upload, err = s.CreateUpload(hashResult.Hash, mimeType, userId, userIp, uint64(len), bucket)
|
2024-01-19 20:51:31 +00:00
|
|
|
if err != nil {
|
2024-02-09 20:23:33 +00:00
|
|
|
return nil, err
|
2024-01-19 20:51:31 +00:00
|
|
|
}
|
|
|
|
|
2024-02-16 01:55:21 +00:00
|
|
|
return upload, nil
|
2024-01-19 20:51:31 +00:00
|
|
|
}
|
|
|
|
|
2024-02-01 01:29:27 +00:00
|
|
|
func (s *StorageServiceDefault) BuildUploadBufferTus(basePath string, preUploadCb TusPreUploadCreateCallback, preFinishCb TusPreFinishResponseCallback) (*tusd.Handler, tusd.DataStore, *s3.Client, error) {
|
2024-01-19 20:51:31 +00:00
|
|
|
customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
|
|
|
|
if service == s3.ServiceID {
|
|
|
|
return aws.Endpoint{
|
2024-01-28 07:20:59 +00:00
|
|
|
URL: s.config.GetString("core.storage.s3.endpoint"),
|
|
|
|
SigningRegion: s.config.GetString("core.storage.s3.region"),
|
2024-01-19 20:51:31 +00:00
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
return aws.Endpoint{}, &aws.EndpointNotFoundError{}
|
|
|
|
})
|
|
|
|
|
|
|
|
cfg, err := config.LoadDefaultConfig(context.TODO(),
|
|
|
|
config.WithRegion("us-east-1"),
|
|
|
|
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
|
2024-01-28 07:20:59 +00:00
|
|
|
s.config.GetString("core.storage.s3.accessKey"),
|
|
|
|
s.config.GetString("core.storage.s3.secretKey"),
|
2024-01-19 20:51:31 +00:00
|
|
|
"",
|
|
|
|
)),
|
|
|
|
config.WithEndpointResolverWithOptions(customResolver),
|
|
|
|
)
|
|
|
|
if err != nil {
|
2024-01-22 23:51:09 +00:00
|
|
|
return nil, nil, nil, nil
|
2024-01-19 20:51:31 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
s3Client := s3.NewFromConfig(cfg)
|
|
|
|
|
2024-01-28 07:20:59 +00:00
|
|
|
store := s3store.New(s.config.GetString("core.storage.s3.bufferBucket"), s3Client)
|
2024-01-19 20:51:31 +00:00
|
|
|
|
2024-01-28 07:20:59 +00:00
|
|
|
locker := NewMySQLLocker(s.db, s.logger)
|
2024-01-19 20:51:31 +00:00
|
|
|
|
|
|
|
composer := tusd.NewStoreComposer()
|
|
|
|
store.UseIn(composer)
|
|
|
|
composer.UseLocker(locker)
|
2024-01-15 13:38:05 +00:00
|
|
|
|
2024-01-19 20:51:31 +00:00
|
|
|
handler, err := tusd.NewHandler(tusd.Config{
|
|
|
|
BasePath: basePath,
|
|
|
|
StoreComposer: composer,
|
|
|
|
DisableDownload: true,
|
|
|
|
NotifyCompleteUploads: true,
|
|
|
|
NotifyTerminatedUploads: true,
|
2024-01-20 11:27:11 +00:00
|
|
|
NotifyCreatedUploads: true,
|
2024-01-24 19:46:46 +00:00
|
|
|
RespectForwardedHeaders: true,
|
2024-01-19 20:51:31 +00:00
|
|
|
PreUploadCreateCallback: preUploadCb,
|
|
|
|
})
|
|
|
|
|
2024-01-22 23:51:09 +00:00
|
|
|
return handler, store, s3Client, err
|
2024-01-19 20:51:31 +00:00
|
|
|
}
|
|
|
|
|
2024-02-01 01:29:27 +00:00
|
|
|
func (s *StorageServiceDefault) init() error {
|
2024-01-19 20:51:31 +00:00
|
|
|
preUpload := func(hook tusd.HookEvent) (tusd.HTTPResponse, tusd.FileInfoChanges, error) {
|
|
|
|
blankResp := tusd.HTTPResponse{}
|
|
|
|
blankChanges := tusd.FileInfoChanges{}
|
|
|
|
|
|
|
|
hash, ok := hook.Upload.MetaData["hash"]
|
|
|
|
if !ok {
|
|
|
|
return blankResp, blankChanges, errors.New("missing hash")
|
|
|
|
}
|
|
|
|
|
|
|
|
decodedHash, err := encoding.MultihashFromBase64Url(hash)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return blankResp, blankChanges, err
|
|
|
|
}
|
|
|
|
|
|
|
|
exists, _ := s.FileExists(decodedHash.HashBytes())
|
|
|
|
|
|
|
|
if exists {
|
|
|
|
return blankResp, blankChanges, errors.New("file already exists")
|
|
|
|
}
|
|
|
|
|
|
|
|
exists, _ = s.TusUploadExists(decodedHash.HashBytes())
|
|
|
|
|
|
|
|
if exists {
|
|
|
|
return blankResp, blankChanges, errors.New("file is already being uploaded")
|
|
|
|
}
|
|
|
|
|
|
|
|
return blankResp, blankChanges, nil
|
|
|
|
}
|
|
|
|
|
2024-01-22 23:53:31 +00:00
|
|
|
tus, store, s3client, err := s.BuildUploadBufferTus("/s5/upload/tus", preUpload, nil)
|
2024-01-19 20:51:31 +00:00
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
s.tus = tus
|
|
|
|
s.tusStore = store
|
2024-01-22 23:53:31 +00:00
|
|
|
s.s3Client = s3client
|
2024-01-19 20:51:31 +00:00
|
|
|
|
2024-01-28 07:20:59 +00:00
|
|
|
s.cron.RegisterService(s)
|
2024-01-19 22:47:14 +00:00
|
|
|
|
2024-01-19 20:51:31 +00:00
|
|
|
return nil
|
2024-01-15 13:38:05 +00:00
|
|
|
}
|
2024-02-01 01:29:27 +00:00
|
|
|
func (s *StorageServiceDefault) LoadInitialTasks(cron cron.CronService) error {
|
2024-01-19 22:47:14 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2024-02-16 01:55:21 +00:00
|
|
|
func (s *StorageServiceDefault) FileExists(hash []byte) (bool, *models.Upload) {
|
2024-01-16 05:58:51 +00:00
|
|
|
hashStr := hex.EncodeToString(hash)
|
2024-01-16 05:40:50 +00:00
|
|
|
|
2024-01-16 06:01:57 +00:00
|
|
|
var upload models.Upload
|
2024-01-28 07:20:59 +00:00
|
|
|
result := s.db.Model(&models.Upload{}).Where(&models.Upload{Hash: hashStr}).First(&upload)
|
2024-01-16 05:40:50 +00:00
|
|
|
|
2024-02-16 01:55:21 +00:00
|
|
|
return result.RowsAffected > 0, &upload
|
2024-01-16 05:40:50 +00:00
|
|
|
}
|
2024-01-16 05:48:06 +00:00
|
|
|
|
2024-02-16 01:55:21 +00:00
|
|
|
func (s *StorageServiceDefault) GetHashSmall(file io.ReadSeeker) (*bao.Result, int, error) {
|
2024-01-16 05:48:06 +00:00
|
|
|
buf := bytes.NewBuffer(nil)
|
|
|
|
|
|
|
|
_, err := io.Copy(buf, file)
|
|
|
|
if err != nil {
|
2024-02-16 01:55:21 +00:00
|
|
|
return nil, 0, err
|
2024-01-16 05:48:06 +00:00
|
|
|
}
|
|
|
|
|
2024-02-09 20:23:33 +00:00
|
|
|
result, _, err := bao.Hash(buf)
|
|
|
|
if err != nil {
|
2024-02-16 01:55:21 +00:00
|
|
|
return nil, 0, err
|
2024-02-09 20:23:33 +00:00
|
|
|
}
|
2024-01-16 05:48:06 +00:00
|
|
|
|
2024-02-16 01:55:21 +00:00
|
|
|
return result, buf.Len(), nil
|
2024-01-16 05:48:06 +00:00
|
|
|
}
|
2024-02-09 20:23:33 +00:00
|
|
|
func (s *StorageServiceDefault) GetHash(file io.Reader) (*bao.Result, int, error) {
|
|
|
|
hash, totalBytes, err := bao.Hash(file)
|
2024-01-19 20:51:31 +00:00
|
|
|
|
|
|
|
if err != nil {
|
2024-01-22 23:52:37 +00:00
|
|
|
return nil, 0, err
|
2024-01-19 20:51:31 +00:00
|
|
|
}
|
|
|
|
|
2024-02-09 20:23:33 +00:00
|
|
|
return hash, totalBytes, nil
|
2024-01-19 20:51:31 +00:00
|
|
|
}
|
|
|
|
|
2024-02-01 01:29:27 +00:00
|
|
|
func (s *StorageServiceDefault) CreateUpload(hash []byte, mime string, uploaderID uint, uploaderIP string, size uint64, protocol string) (*models.Upload, error) {
|
2024-01-17 19:46:22 +00:00
|
|
|
hashStr := hex.EncodeToString(hash)
|
|
|
|
|
|
|
|
upload := &models.Upload{
|
|
|
|
Hash: hashStr,
|
2024-01-26 00:05:52 +00:00
|
|
|
MimeType: mime,
|
2024-01-17 19:46:22 +00:00
|
|
|
UserID: uploaderID,
|
|
|
|
UploaderIP: uploaderIP,
|
|
|
|
Protocol: protocol,
|
|
|
|
Size: size,
|
|
|
|
}
|
|
|
|
|
2024-01-28 07:20:59 +00:00
|
|
|
result := s.db.Create(upload)
|
2024-01-17 19:46:22 +00:00
|
|
|
|
|
|
|
if result.Error != nil {
|
|
|
|
return nil, result.Error
|
|
|
|
}
|
|
|
|
|
|
|
|
return upload, nil
|
|
|
|
}
|
2024-02-01 01:29:27 +00:00
|
|
|
func (s *StorageServiceDefault) tusWorker() {
|
2024-01-19 20:51:31 +00:00
|
|
|
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case info := <-s.tus.CreatedUploads:
|
|
|
|
hash, ok := info.Upload.MetaData["hash"]
|
2024-01-20 11:41:51 +00:00
|
|
|
errorResponse := tusd.HTTPResponse{StatusCode: 400, Header: nil}
|
2024-01-19 20:51:31 +00:00
|
|
|
if !ok {
|
2024-01-28 07:20:59 +00:00
|
|
|
s.logger.Error("Missing hash in metadata")
|
2024-01-19 20:51:31 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2024-02-16 01:55:21 +00:00
|
|
|
uploaderID, ok := info.Context.Value(middleware.DEFAULT_AUTH_CONTEXT_KEY).(uint64)
|
2024-01-19 20:51:31 +00:00
|
|
|
if !ok {
|
2024-01-20 11:41:51 +00:00
|
|
|
errorResponse.Body = "Missing user id in context"
|
|
|
|
info.Upload.StopUpload(errorResponse)
|
2024-01-28 07:20:59 +00:00
|
|
|
s.logger.Error("Missing user id in context")
|
2024-01-19 20:51:31 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
uploaderIP := info.HTTPRequest.RemoteAddr
|
|
|
|
|
|
|
|
decodedHash, err := encoding.MultihashFromBase64Url(hash)
|
|
|
|
|
|
|
|
if err != nil {
|
2024-01-20 11:41:51 +00:00
|
|
|
errorResponse.Body = "Could not decode hash"
|
|
|
|
info.Upload.StopUpload(errorResponse)
|
2024-01-28 07:20:59 +00:00
|
|
|
s.logger.Error("Could not decode hash", zap.Error(err))
|
2024-01-19 20:51:31 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2024-01-20 11:57:57 +00:00
|
|
|
_, err = s.CreateTusUpload(decodedHash.HashBytes(), info.Upload.ID, uint(uploaderID), uploaderIP, info.Context.Value("protocol").(string))
|
2024-01-19 20:51:31 +00:00
|
|
|
if err != nil {
|
2024-01-20 11:41:51 +00:00
|
|
|
errorResponse.Body = "Could not create tus upload"
|
|
|
|
info.Upload.StopUpload(errorResponse)
|
2024-01-28 07:20:59 +00:00
|
|
|
s.logger.Error("Could not create tus upload", zap.Error(err))
|
2024-01-19 20:51:31 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
case info := <-s.tus.UploadProgress:
|
|
|
|
err := s.TusUploadProgress(info.Upload.ID)
|
|
|
|
if err != nil {
|
2024-01-28 07:20:59 +00:00
|
|
|
s.logger.Error("Could not update tus upload", zap.Error(err))
|
2024-01-19 20:51:31 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
case info := <-s.tus.TerminatedUploads:
|
|
|
|
err := s.DeleteTusUpload(info.Upload.ID)
|
|
|
|
if err != nil {
|
2024-01-28 07:20:59 +00:00
|
|
|
s.logger.Error("Could not delete tus upload", zap.Error(err))
|
2024-01-19 20:51:31 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
case info := <-s.tus.CompleteUploads:
|
2024-01-22 23:02:15 +00:00
|
|
|
if !(!info.Upload.SizeIsDeferred && info.Upload.Offset == info.Upload.Size) {
|
2024-01-22 22:49:42 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
err := s.TusUploadCompleted(info.Upload.ID)
|
|
|
|
if err != nil {
|
2024-01-28 07:20:59 +00:00
|
|
|
s.logger.Error("Could not complete tus upload", zap.Error(err))
|
2024-01-22 22:49:42 +00:00
|
|
|
continue
|
|
|
|
}
|
2024-01-28 21:26:15 +00:00
|
|
|
err = s.ScheduleTusUpload(info.Upload.ID)
|
2024-01-19 20:51:31 +00:00
|
|
|
if err != nil {
|
2024-01-28 07:20:59 +00:00
|
|
|
s.logger.Error("Could not schedule tus upload", zap.Error(err))
|
2024-01-19 20:51:31 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-02-01 01:29:27 +00:00
|
|
|
func (s *StorageServiceDefault) TusUploadExists(hash []byte) (bool, models.TusUpload) {
|
2024-01-19 20:51:31 +00:00
|
|
|
hashStr := hex.EncodeToString(hash)
|
|
|
|
|
2024-01-20 12:05:27 +00:00
|
|
|
var upload models.TusUpload
|
2024-01-28 07:20:59 +00:00
|
|
|
result := s.db.Model(&models.TusUpload{}).Where(&models.TusUpload{Hash: hashStr}).First(&upload)
|
2024-01-19 20:51:31 +00:00
|
|
|
|
|
|
|
return result.RowsAffected > 0, upload
|
|
|
|
}
|
|
|
|
|
2024-02-01 01:29:27 +00:00
|
|
|
func (s *StorageServiceDefault) CreateTusUpload(hash []byte, uploadID string, uploaderID uint, uploaderIP string, protocol string) (*models.TusUpload, error) {
|
2024-01-19 20:51:31 +00:00
|
|
|
hashStr := hex.EncodeToString(hash)
|
|
|
|
|
|
|
|
upload := &models.TusUpload{
|
|
|
|
Hash: hashStr,
|
|
|
|
UploadID: uploadID,
|
|
|
|
UploaderID: uploaderID,
|
|
|
|
UploaderIP: uploaderIP,
|
|
|
|
Uploader: models.User{},
|
|
|
|
Protocol: protocol,
|
|
|
|
}
|
|
|
|
|
2024-01-28 07:20:59 +00:00
|
|
|
result := s.db.Create(upload)
|
2024-01-19 20:51:31 +00:00
|
|
|
|
|
|
|
if result.Error != nil {
|
|
|
|
return nil, result.Error
|
|
|
|
}
|
|
|
|
|
|
|
|
return upload, nil
|
|
|
|
}
|
2024-02-01 01:29:27 +00:00
|
|
|
func (s *StorageServiceDefault) TusUploadProgress(uploadID string) error {
|
2024-01-19 20:51:31 +00:00
|
|
|
|
|
|
|
find := &models.TusUpload{UploadID: uploadID}
|
|
|
|
|
|
|
|
var upload models.TusUpload
|
2024-01-28 07:20:59 +00:00
|
|
|
result := s.db.Model(&models.TusUpload{}).Where(find).First(&upload)
|
2024-01-19 20:51:31 +00:00
|
|
|
|
|
|
|
if result.RowsAffected == 0 {
|
|
|
|
return errors.New("upload not found")
|
|
|
|
}
|
|
|
|
|
2024-01-28 07:20:59 +00:00
|
|
|
result = s.db.Model(&models.TusUpload{}).Where(find).Update("updated_at", time.Now())
|
2024-01-19 20:51:31 +00:00
|
|
|
|
|
|
|
if result.Error != nil {
|
|
|
|
return result.Error
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
2024-02-01 01:29:27 +00:00
|
|
|
func (s *StorageServiceDefault) TusUploadCompleted(uploadID string) error {
|
2024-01-22 22:49:03 +00:00
|
|
|
|
|
|
|
find := &models.TusUpload{UploadID: uploadID}
|
|
|
|
|
|
|
|
var upload models.TusUpload
|
2024-01-28 07:20:59 +00:00
|
|
|
result := s.db.Model(&models.TusUpload{}).Where(find).First(&upload)
|
2024-01-22 22:49:03 +00:00
|
|
|
|
|
|
|
if result.RowsAffected == 0 {
|
|
|
|
return errors.New("upload not found")
|
|
|
|
}
|
|
|
|
|
2024-01-28 07:20:59 +00:00
|
|
|
result = s.db.Model(&models.TusUpload{}).Where(find).Update("completed", true)
|
2024-01-22 22:49:03 +00:00
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
2024-02-01 01:29:27 +00:00
|
|
|
func (s *StorageServiceDefault) DeleteTusUpload(uploadID string) error {
|
2024-01-28 07:20:59 +00:00
|
|
|
result := s.db.Where(&models.TusUpload{UploadID: uploadID}).Delete(&models.TusUpload{})
|
2024-01-19 20:51:31 +00:00
|
|
|
|
|
|
|
if result.Error != nil {
|
|
|
|
return result.Error
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2024-02-01 01:29:27 +00:00
|
|
|
func (s *StorageServiceDefault) ScheduleTusUpload(uploadID string) error {
|
2024-01-19 20:51:31 +00:00
|
|
|
find := &models.TusUpload{UploadID: uploadID}
|
|
|
|
|
|
|
|
var upload models.TusUpload
|
2024-01-28 07:20:59 +00:00
|
|
|
result := s.db.Model(&models.TusUpload{}).Where(find).First(&upload)
|
2024-01-19 20:51:31 +00:00
|
|
|
|
|
|
|
if result.RowsAffected == 0 {
|
|
|
|
return errors.New("upload not found")
|
|
|
|
}
|
|
|
|
|
2024-01-28 21:26:15 +00:00
|
|
|
task := s.cron.RetryableTask(cron.RetryableTaskParams{
|
|
|
|
Name: "tusUpload",
|
|
|
|
Function: s.tusUploadTask,
|
|
|
|
Args: []interface{}{&upload},
|
|
|
|
Attempt: 0,
|
|
|
|
Limit: 0,
|
|
|
|
After: func(jobID uuid.UUID, jobName string) {
|
2024-01-28 07:20:59 +00:00
|
|
|
s.logger.Info("Job finished", zap.String("jobName", jobName), zap.String("uploadID", uploadID))
|
2024-01-19 20:51:31 +00:00
|
|
|
err := s.DeleteTusUpload(uploadID)
|
|
|
|
if err != nil {
|
2024-01-28 07:20:59 +00:00
|
|
|
s.logger.Error("Error deleting tus upload", zap.Error(err))
|
2024-01-19 20:51:31 +00:00
|
|
|
}
|
2024-01-28 21:26:15 +00:00
|
|
|
},
|
|
|
|
})
|
|
|
|
|
|
|
|
_, err := s.cron.CreateJob(task)
|
2024-01-19 20:51:31 +00:00
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2024-02-01 01:29:27 +00:00
|
|
|
func (s *StorageServiceDefault) tusUploadTask(upload *models.TusUpload) error {
|
2024-01-28 21:26:15 +00:00
|
|
|
ctx := context.Background()
|
|
|
|
tusUpload, err := s.tusStore.GetUpload(ctx, upload.UploadID)
|
|
|
|
if err != nil {
|
|
|
|
s.logger.Error("Could not get upload", zap.Error(err))
|
|
|
|
return err
|
|
|
|
}
|
2024-01-19 20:51:31 +00:00
|
|
|
|
2024-02-01 07:03:04 +00:00
|
|
|
readerHash, err := tusUpload.GetReader(ctx)
|
2024-01-28 21:26:15 +00:00
|
|
|
if err != nil {
|
|
|
|
s.logger.Error("Could not get tus file", zap.Error(err))
|
|
|
|
return err
|
|
|
|
}
|
2024-01-19 20:51:31 +00:00
|
|
|
|
2024-02-01 07:03:04 +00:00
|
|
|
defer func(reader io.ReadCloser) {
|
|
|
|
err := reader.Close()
|
|
|
|
if err != nil {
|
|
|
|
s.logger.Error("Could not close reader", zap.Error(err))
|
|
|
|
}
|
|
|
|
}(readerHash)
|
|
|
|
|
|
|
|
hash, byteCount, err := s.GetHash(readerHash)
|
2024-01-19 20:51:31 +00:00
|
|
|
|
2024-01-28 21:26:15 +00:00
|
|
|
if err != nil {
|
|
|
|
s.logger.Error("Could not compute hash", zap.Error(err))
|
|
|
|
return err
|
|
|
|
}
|
2024-01-19 20:51:31 +00:00
|
|
|
|
2024-01-28 21:26:15 +00:00
|
|
|
dbHash, err := hex.DecodeString(upload.Hash)
|
2024-01-19 20:51:31 +00:00
|
|
|
|
2024-01-28 21:26:15 +00:00
|
|
|
if err != nil {
|
|
|
|
s.logger.Error("Could not decode hash", zap.Error(err))
|
|
|
|
return err
|
|
|
|
}
|
2024-01-19 20:51:31 +00:00
|
|
|
|
2024-02-09 20:23:33 +00:00
|
|
|
if !bytes.Equal(hash.Hash, dbHash) {
|
2024-01-28 21:26:15 +00:00
|
|
|
s.logger.Error("Hashes do not match", zap.Any("upload", upload), zap.Any("hash", hash), zap.Any("dbHash", dbHash))
|
|
|
|
return err
|
|
|
|
}
|
2024-01-19 20:51:31 +00:00
|
|
|
|
2024-02-01 07:03:04 +00:00
|
|
|
readerMime, err := tusUpload.GetReader(ctx)
|
2024-01-28 21:26:15 +00:00
|
|
|
if err != nil {
|
|
|
|
s.logger.Error("Could not get tus file", zap.Error(err))
|
|
|
|
return err
|
|
|
|
}
|
2024-01-19 20:51:31 +00:00
|
|
|
|
2024-02-01 07:03:04 +00:00
|
|
|
defer func(reader io.ReadCloser) {
|
|
|
|
err := reader.Close()
|
|
|
|
if err != nil {
|
|
|
|
s.logger.Error("Could not close reader", zap.Error(err))
|
|
|
|
}
|
|
|
|
}(readerMime)
|
|
|
|
|
2024-01-28 21:26:15 +00:00
|
|
|
var mimeBuf [512]byte
|
2024-01-20 17:26:31 +00:00
|
|
|
|
2024-02-01 07:03:04 +00:00
|
|
|
_, err = readerMime.Read(mimeBuf[:])
|
2024-01-26 00:05:52 +00:00
|
|
|
|
2024-01-28 21:26:15 +00:00
|
|
|
if err != nil {
|
|
|
|
s.logger.Error("Could not read mime", zap.Error(err))
|
|
|
|
return err
|
|
|
|
}
|
2024-01-26 00:05:52 +00:00
|
|
|
|
2024-01-28 21:26:15 +00:00
|
|
|
mimeType := http.DetectContentType(mimeBuf[:])
|
2024-01-26 00:05:52 +00:00
|
|
|
|
2024-01-28 21:26:15 +00:00
|
|
|
upload.MimeType = mimeType
|
2024-01-26 00:05:52 +00:00
|
|
|
|
2024-01-28 21:26:15 +00:00
|
|
|
if tx := s.db.Save(upload); tx.Error != nil {
|
|
|
|
s.logger.Error("Could not update tus upload", zap.Error(tx.Error))
|
|
|
|
return tx.Error
|
|
|
|
}
|
2024-01-26 00:05:52 +00:00
|
|
|
|
2024-02-09 20:23:33 +00:00
|
|
|
hashStr, err := encoding.NewMultihash(s.getPrefixedHash(hash.Hash)).ToBase64Url()
|
2024-02-01 07:03:04 +00:00
|
|
|
err = s.renter.CreateBucketIfNotExists(upload.Protocol)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
info, err := tusUpload.GetInfo(context.Background())
|
2024-01-28 21:26:15 +00:00
|
|
|
if err != nil {
|
2024-02-01 07:03:04 +00:00
|
|
|
s.logger.Error("Could not get tus info", zap.Error(err))
|
2024-01-28 21:26:15 +00:00
|
|
|
return err
|
|
|
|
}
|
2024-01-26 00:05:52 +00:00
|
|
|
|
2024-02-01 07:03:04 +00:00
|
|
|
err = s.renter.MultipartUpload(renter.MultiPartUploadParams{
|
|
|
|
ReaderFactory: func(start uint, end uint) (io.ReadCloser, error) {
|
|
|
|
rangeHeader := fmt.Sprintf("bytes=%d-%d", start, end)
|
|
|
|
ctx = context.WithValue(ctx, "range", rangeHeader)
|
|
|
|
return tusUpload.GetReader(ctx)
|
|
|
|
},
|
|
|
|
Bucket: upload.Protocol,
|
2024-02-02 21:45:50 +00:00
|
|
|
FileName: "/" + hashStr,
|
2024-02-01 07:03:04 +00:00
|
|
|
Size: uint64(info.Size),
|
|
|
|
})
|
2024-01-26 00:05:52 +00:00
|
|
|
|
2024-01-28 21:26:15 +00:00
|
|
|
if err != nil {
|
|
|
|
s.logger.Error("Could not upload file", zap.Error(err))
|
|
|
|
return err
|
|
|
|
}
|
2024-01-19 20:51:31 +00:00
|
|
|
|
2024-02-09 20:23:33 +00:00
|
|
|
err = s.renter.UploadObject(context.Background(), bytes.NewReader(hash.Proof), upload.Protocol, fmt.Sprintf("%s.bao", hashStr))
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2024-01-28 21:26:15 +00:00
|
|
|
s3InfoId, _ := splitS3Ids(upload.UploadID)
|
2024-01-21 05:08:49 +00:00
|
|
|
|
2024-01-28 21:26:15 +00:00
|
|
|
_, err = s.s3Client.DeleteObjects(ctx, &s3.DeleteObjectsInput{
|
|
|
|
Bucket: aws.String(s.config.GetString("core.storage.s3.bufferBucket")),
|
|
|
|
Delete: &s3types.Delete{
|
|
|
|
Objects: []s3types.ObjectIdentifier{
|
|
|
|
{
|
|
|
|
Key: aws.String(s3InfoId),
|
2024-01-23 00:06:28 +00:00
|
|
|
},
|
2024-01-28 21:26:15 +00:00
|
|
|
{
|
|
|
|
Key: aws.String(s3InfoId + ".info"),
|
|
|
|
},
|
|
|
|
},
|
|
|
|
Quiet: aws.Bool(true),
|
|
|
|
},
|
|
|
|
})
|
2024-01-22 23:25:11 +00:00
|
|
|
|
2024-01-28 21:26:15 +00:00
|
|
|
if err != nil {
|
|
|
|
s.logger.Error("Could not delete upload metadata", zap.Error(err))
|
|
|
|
return err
|
|
|
|
}
|
2024-01-22 23:25:11 +00:00
|
|
|
|
2024-01-28 21:26:15 +00:00
|
|
|
newUpload, err := s.CreateUpload(dbHash, mimeType, upload.UploaderID, upload.UploaderIP, uint64(byteCount), upload.Protocol)
|
|
|
|
if err != nil {
|
|
|
|
s.logger.Error("Could not create upload", zap.Error(err))
|
|
|
|
return err
|
|
|
|
}
|
2024-01-23 00:08:56 +00:00
|
|
|
|
2024-01-28 21:26:15 +00:00
|
|
|
err = s.accounts.PinByID(newUpload.ID, upload.UploaderID)
|
|
|
|
if err != nil {
|
|
|
|
s.logger.Error("Could not pin upload", zap.Error(err))
|
|
|
|
return err
|
|
|
|
}
|
2024-01-19 20:51:31 +00:00
|
|
|
|
2024-01-28 21:26:15 +00:00
|
|
|
return nil
|
2024-01-19 20:51:31 +00:00
|
|
|
}
|
|
|
|
|
2024-02-01 01:29:27 +00:00
|
|
|
func (s *StorageServiceDefault) getPrefixedHash(hash []byte) []byte {
|
2024-01-19 20:51:31 +00:00
|
|
|
return append([]byte{byte(types.HashTypeBlake3)}, hash...)
|
|
|
|
}
|
2024-01-22 23:54:19 +00:00
|
|
|
|
|
|
|
func splitS3Ids(id string) (objectId, multipartId string) {
|
|
|
|
index := strings.Index(id, "+")
|
|
|
|
if index == -1 {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
objectId = id[:index]
|
|
|
|
multipartId = id[index+1:]
|
|
|
|
return
|
|
|
|
}
|
2024-01-24 06:26:40 +00:00
|
|
|
|
2024-02-01 01:29:27 +00:00
|
|
|
func (s *StorageServiceDefault) GetFile(hash []byte, start int64) (io.ReadCloser, int64, error) {
|
2024-01-24 06:26:40 +00:00
|
|
|
if exists, tusUpload := s.TusUploadExists(hash); exists {
|
|
|
|
if tusUpload.Completed {
|
|
|
|
upload, err := s.tusStore.GetUpload(context.Background(), tusUpload.UploadID)
|
|
|
|
if err != nil {
|
|
|
|
return nil, 0, err
|
|
|
|
}
|
|
|
|
|
|
|
|
info, _ := upload.GetInfo(context.Background())
|
2024-01-25 14:50:17 +00:00
|
|
|
|
|
|
|
ctx := context.Background()
|
|
|
|
|
|
|
|
if start > 0 {
|
|
|
|
endPosition := start + info.Size - 1
|
|
|
|
rangeHeader := fmt.Sprintf("bytes=%d-%d", start, endPosition)
|
|
|
|
ctx = context.WithValue(ctx, "range", rangeHeader)
|
|
|
|
}
|
|
|
|
|
|
|
|
reader, err := upload.GetReader(ctx)
|
2024-01-24 06:26:40 +00:00
|
|
|
|
2024-01-25 00:05:54 +00:00
|
|
|
return reader, info.Size, err
|
2024-01-24 06:26:40 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
exists, upload := s.FileExists(hash)
|
|
|
|
|
|
|
|
if !exists {
|
|
|
|
return nil, 0, errors.New("file does not exist")
|
|
|
|
}
|
|
|
|
|
2024-01-24 07:00:53 +00:00
|
|
|
hashStr, err := encoding.NewMultihash(s.getPrefixedHash(hash)).ToBase64Url()
|
|
|
|
if err != nil {
|
|
|
|
return nil, 0, err
|
|
|
|
}
|
2024-01-24 06:26:40 +00:00
|
|
|
|
2024-01-25 13:37:15 +00:00
|
|
|
var partialRange api.DownloadRange
|
2024-01-25 00:05:54 +00:00
|
|
|
|
|
|
|
if start > 0 {
|
2024-01-25 13:37:15 +00:00
|
|
|
partialRange = api.DownloadRange{
|
|
|
|
Offset: start,
|
|
|
|
Length: int64(upload.Size) - start + 1,
|
|
|
|
Size: int64(upload.Size),
|
|
|
|
}
|
2024-01-25 00:05:54 +00:00
|
|
|
}
|
|
|
|
|
2024-02-01 02:27:38 +00:00
|
|
|
object, err := s.renter.GetObject(context.Background(), upload.Protocol, hashStr, api.DownloadObjectOptions{
|
2024-01-25 13:37:15 +00:00
|
|
|
Range: partialRange,
|
|
|
|
})
|
2024-01-24 06:26:40 +00:00
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return nil, 0, err
|
|
|
|
}
|
|
|
|
|
2024-01-25 13:37:15 +00:00
|
|
|
return object.Content, int64(upload.Size), nil
|
2024-01-24 06:26:40 +00:00
|
|
|
}
|
2024-02-01 01:29:27 +00:00
|
|
|
func (s *StorageServiceDefault) NewFile(hash []byte) *FileImpl {
|
2024-02-09 20:42:53 +00:00
|
|
|
return NewFile(FileParams{
|
|
|
|
Storage: s,
|
|
|
|
Renter: s.renter,
|
|
|
|
Hash: hash,
|
|
|
|
})
|
2024-01-25 21:31:05 +00:00
|
|
|
}
|