From 6acf8a606a8ae8a476b265a79683f7a08489c9a6 Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Fri, 19 Jan 2024 15:51:31 -0500 Subject: [PATCH] feat: initial tus protocol and processing support --- interfaces/storage.go | 19 ++- storage/storage.go | 367 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 375 insertions(+), 11 deletions(-) diff --git a/interfaces/storage.go b/interfaces/storage.go index 713212f..657113b 100644 --- a/interfaces/storage.go +++ b/interfaces/storage.go @@ -2,13 +2,26 @@ package interfaces import ( "git.lumeweb.com/LumeWeb/portal/db/models" + tusd "github.com/tus/tusd/v2/pkg/handler" "io" ) +type TusPreUploadCreateCallback func(hook tusd.HookEvent) (tusd.HTTPResponse, tusd.FileInfoChanges, error) +type TusPreFinishResponseCallback func(hook tusd.HookEvent) (tusd.HTTPResponse, error) + type StorageService interface { - Init() - PutFile(file io.ReadSeeker, bucket string, generateProof bool) ([]byte, error) + Portal() Portal + PutFileSmall(file io.ReadSeeker, bucket string, generateProof bool) ([]byte, error) + PutFile(file io.Reader, bucket string, hash []byte) error + BuildUploadBufferTus(basePath string, preUploadCb TusPreUploadCreateCallback, preFinishCb TusPreFinishResponseCallback) (*tusd.Handler, tusd.DataStore, error) FileExists(hash []byte) (bool, models.Upload) - GetHash(file io.ReadSeeker) ([]byte, error) + GetHashSmall(file io.ReadSeeker) ([]byte, error) + GetHash(file io.Reader) ([]byte, error) CreateUpload(hash []byte, uploaderID uint, uploaderIP string, size uint64, protocol string) (*models.Upload, error) + TusUploadExists(hash []byte) (bool, models.Upload) + CreateTusUpload(hash []byte, uploadID string, uploaderID uint, uploaderIP string, protocol string) (*models.TusUpload, error) + TusUploadProgress(uploadID string) error + DeleteTusUpload(uploadID string) error + ScheduleTusUpload(uploadID string, attempt int) error + Service } diff --git a/storage/storage.go b/storage/storage.go index 3fd067b..877a714 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -2,14 +2,27 @@ package storage import ( "bytes" + "context" "encoding/hex" "errors" "git.lumeweb.com/LumeWeb/libs5-go/encoding" + "git.lumeweb.com/LumeWeb/libs5-go/types" + "git.lumeweb.com/LumeWeb/portal/api/s5" "git.lumeweb.com/LumeWeb/portal/db/models" "git.lumeweb.com/LumeWeb/portal/interfaces" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/go-co-op/gocron/v2" "github.com/go-resty/resty/v2" + "github.com/google/uuid" + tusd "github.com/tus/tusd/v2/pkg/handler" + s3store "github.com/tus/tusd/v2/pkg/s3store" + "go.uber.org/zap" "io" "lukechampine.com/blake3" + "time" ) var ( @@ -17,8 +30,18 @@ var ( ) type StorageServiceImpl struct { - portal interfaces.Portal - httpApi *resty.Client + portal interfaces.Portal + httpApi *resty.Client + tus *tusd.Handler + tusStore tusd.DataStore +} + +func (s *StorageServiceImpl) Start() error { + return nil +} + +func (s *StorageServiceImpl) Portal() interfaces.Portal { + return s.portal } func NewStorageService(portal interfaces.Portal) interfaces.StorageService { @@ -28,10 +51,9 @@ func NewStorageService(portal interfaces.Portal) interfaces.StorageService { } } -func (s StorageServiceImpl) PutFile(file io.ReadSeeker, bucket string, generateProof bool) ([]byte, error) { - - hash, err := s.GetHash(file) - hashStr, err := encoding.NewMultihash(hash[:]).ToBase64Url() +func (s StorageServiceImpl) PutFileSmall(file io.ReadSeeker, bucket string, generateProof bool) ([]byte, error) { + hash, err := s.GetHashSmall(file) + hashStr, err := encoding.NewMultihash(s.getPrefixedHash(hash)).ToBase64Url() if err != nil { return nil, err } @@ -67,8 +89,82 @@ func (s StorageServiceImpl) PutFile(file io.ReadSeeker, bucket string, generateP return hash[:], nil } +func (s StorageServiceImpl) PutFile(file io.Reader, bucket string, hash []byte) error { + hashStr, err := encoding.NewMultihash(s.getPrefixedHash(hash)).ToBase64Url() + err = s.createBucketIfNotExists(bucket) + if err != nil { + return err + } -func (s *StorageServiceImpl) Init() { + resp, err := s.httpApi.R(). + SetPathParam("path", hashStr). + SetFormData(map[string]string{ + "bucket": bucket, + }). + SetBody(file).Put("/api/worker/objects/{path}") + if err != nil { + return err + } + + if resp.IsError() { + if resp.Error() != nil { + return resp.Error().(error) + } + + return errors.New(resp.String()) + + } + + return nil +} + +func (s *StorageServiceImpl) BuildUploadBufferTus(basePath string, preUploadCb interfaces.TusPreUploadCreateCallback, preFinishCb interfaces.TusPreFinishResponseCallback) (*tusd.Handler, tusd.DataStore, error) { + customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) { + if service == s3.ServiceID { + return aws.Endpoint{ + URL: s.portal.Config().GetString("core.storage.s3.endpoint"), + SigningRegion: s.portal.Config().GetString("core.storage.s3.region"), + }, nil + } + return aws.Endpoint{}, &aws.EndpointNotFoundError{} + }) + + cfg, err := config.LoadDefaultConfig(context.TODO(), + config.WithRegion("us-east-1"), + config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider( + s.portal.Config().GetString("core.storage.s3.accessKey"), + s.portal.Config().GetString("core.storage.s3.secretKey"), + "", + )), + config.WithEndpointResolverWithOptions(customResolver), + ) + if err != nil { + return nil, nil, nil + } + + s3Client := s3.NewFromConfig(cfg) + + store := s3store.New(s.portal.Config().GetString("core.storage.s3.bufferBucket"), s3Client) + + locker := NewMySQLLocker(s) + + composer := tusd.NewStoreComposer() + store.UseIn(composer) + composer.UseLocker(locker) + + handler, err := tusd.NewHandler(tusd.Config{ + BasePath: basePath, + StoreComposer: composer, + DisableDownload: true, + NotifyCompleteUploads: true, + NotifyTerminatedUploads: true, + PreUploadCreateCallback: preUploadCb, + }) + + return handler, store, err +} + +func (s *StorageServiceImpl) Init() error { client := resty.New() client.SetDisableWarn(true) @@ -76,6 +172,49 @@ func (s *StorageServiceImpl) Init() { client.SetBasicAuth("", s.portal.Config().GetString("core.sia.key")) s.httpApi = client + + preUpload := func(hook tusd.HookEvent) (tusd.HTTPResponse, tusd.FileInfoChanges, error) { + blankResp := tusd.HTTPResponse{} + blankChanges := tusd.FileInfoChanges{} + + hash, ok := hook.Upload.MetaData["hash"] + if !ok { + return blankResp, blankChanges, errors.New("missing hash") + } + + decodedHash, err := encoding.MultihashFromBase64Url(hash) + + if err != nil { + return blankResp, blankChanges, err + } + + exists, _ := s.FileExists(decodedHash.HashBytes()) + + if exists { + return blankResp, blankChanges, errors.New("file already exists") + } + + exists, _ = s.TusUploadExists(decodedHash.HashBytes()) + + if exists { + return blankResp, blankChanges, errors.New("file is already being uploaded") + } + + return blankResp, blankChanges, nil + } + + tus, store, err := s.BuildUploadBufferTus("/s5/upload/tus", preUpload, nil) + + if err != nil { + return err + } + + s.tus = tus + s.tusStore = store + + go s.tusWorker() + + return nil } func (s *StorageServiceImpl) createBucketIfNotExists(bucket string) error { resp, err := s.httpApi.R(). @@ -117,7 +256,7 @@ func (s *StorageServiceImpl) FileExists(hash []byte) (bool, models.Upload) { return result.RowsAffected > 0, upload } -func (s *StorageServiceImpl) GetHash(file io.ReadSeeker) ([]byte, error) { +func (s *StorageServiceImpl) GetHashSmall(file io.ReadSeeker) ([]byte, error) { buf := bytes.NewBuffer(nil) _, err := io.Copy(buf, file) @@ -129,6 +268,20 @@ func (s *StorageServiceImpl) GetHash(file io.ReadSeeker) ([]byte, error) { return hash[:], nil } +func (s *StorageServiceImpl) GetHash(file io.Reader) ([]byte, error) { + hasher := blake3.New(64, nil) + + _, err := io.Copy(hasher, file) + + if err != nil { + return nil, err + } + + hash := hasher.Sum(nil) + + return hash, nil +} + func (s *StorageServiceImpl) CreateUpload(hash []byte, uploaderID uint, uploaderIP string, size uint64, protocol string) (*models.Upload, error) { hashStr := hex.EncodeToString(hash) @@ -148,3 +301,201 @@ func (s *StorageServiceImpl) CreateUpload(hash []byte, uploaderID uint, uploader return upload, nil } +func (s *StorageServiceImpl) tusWorker() { + + for { + select { + case info := <-s.tus.CreatedUploads: + hash, ok := info.Upload.MetaData["hash"] + if !ok { + s.portal.Logger().Error("Missing hash in metadata") + continue + } + + uploaderID, ok := info.Context.Value(s5.AuthUserIDKey).(uint) + if !ok { + s.portal.Logger().Error("Missing user id in context") + continue + } + + uploaderIP := info.HTTPRequest.RemoteAddr + + decodedHash, err := encoding.MultihashFromBase64Url(hash) + + if err != nil { + s.portal.Logger().Error("Could not decode hash", zap.Error(err)) + continue + } + + _, err = s.CreateTusUpload(decodedHash.HashBytes(), info.Upload.ID, uploaderID, uploaderIP, info.Context.Value("protocol").(string)) + if err != nil { + s.portal.Logger().Error("Could not create tus upload", zap.Error(err)) + continue + } + case info := <-s.tus.UploadProgress: + err := s.TusUploadProgress(info.Upload.ID) + if err != nil { + s.portal.Logger().Error("Could not update tus upload", zap.Error(err)) + continue + } + case info := <-s.tus.TerminatedUploads: + err := s.DeleteTusUpload(info.Upload.ID) + if err != nil { + s.portal.Logger().Error("Could not delete tus upload", zap.Error(err)) + continue + } + + case info := <-s.tus.CompleteUploads: + err := s.ScheduleTusUpload(info.Upload.ID, 0) + if err != nil { + s.portal.Logger().Error("Could not schedule tus upload", zap.Error(err)) + continue + } + + } + } +} + +func (s *StorageServiceImpl) TusUploadExists(hash []byte) (bool, models.Upload) { + hashStr := hex.EncodeToString(hash) + + var upload models.Upload + result := s.portal.Database().Model(&models.Upload{}).Where(&models.Upload{Hash: hashStr}).First(&upload) + + return result.RowsAffected > 0, upload +} + +func (s *StorageServiceImpl) CreateTusUpload(hash []byte, uploadID string, uploaderID uint, uploaderIP string, protocol string) (*models.TusUpload, error) { + hashStr := hex.EncodeToString(hash) + + upload := &models.TusUpload{ + Hash: hashStr, + UploadID: uploadID, + UploaderID: uploaderID, + UploaderIP: uploaderIP, + Uploader: models.User{}, + Protocol: protocol, + } + + result := s.portal.Database().Create(upload) + + if result.Error != nil { + return nil, result.Error + } + + return upload, nil +} +func (s *StorageServiceImpl) TusUploadProgress(uploadID string) error { + + find := &models.TusUpload{UploadID: uploadID} + + var upload models.TusUpload + result := s.portal.Database().Model(&models.TusUpload{}).Where(find).First(&upload) + + if result.RowsAffected == 0 { + return errors.New("upload not found") + } + + result = s.portal.Database().Model(&models.TusUpload{}).Where(find).Update("updated_at", time.Now()) + + if result.Error != nil { + return result.Error + } + + return nil +} +func (s *StorageServiceImpl) DeleteTusUpload(uploadID string) error { + result := s.portal.Database().Delete(&models.TusUpload{UploadID: uploadID}) + + if result.Error != nil { + return result.Error + } + + return nil +} + +func (s *StorageServiceImpl) ScheduleTusUpload(uploadID string, attempt int) error { + find := &models.TusUpload{UploadID: uploadID} + + var upload models.TusUpload + result := s.portal.Database().Model(&models.TusUpload{}).Where(find).First(&upload) + + if result.RowsAffected == 0 { + return errors.New("upload not found") + } + + job, task := s.buildNewTusUploadTask(&upload) + + if attempt > 0 { + job = gocron.OneTimeJob(gocron.OneTimeJobStartDateTime(time.Now().Add(time.Duration(attempt) * time.Minute))) + } + + _, err := s.portal.Cron().NewJob(job, task, gocron.WithEventListeners(gocron.AfterJobRunsWithError(func(jobID uuid.UUID, jobName string, err error) { + s.portal.Logger().Error("Error running job", zap.Error(err)) + err = s.ScheduleTusUpload(uploadID, attempt+1) + if err != nil { + s.portal.Logger().Error("Error rescheduling job", zap.Error(err)) + } + }), + gocron.AfterJobRuns(func(jobID uuid.UUID, jobName string) { + s.portal.Logger().Info("Job finished", zap.String("jobName", jobName), zap.String("uploadID", uploadID)) + err := s.DeleteTusUpload(uploadID) + if err != nil { + s.portal.Logger().Error("Error deleting tus upload", zap.Error(err)) + } + }))) + + if err != nil { + return err + } + return nil +} + +func (s *StorageServiceImpl) buildNewTusUploadTask(upload *models.TusUpload) (job gocron.JobDefinition, task gocron.Task) { + job = gocron.OneTimeJob(gocron.OneTimeJobStartImmediately()) + + task = gocron.NewTask( + func(upload *models.TusUpload) error { + ctx := context.Background() + tusUpload, err := s.tusStore.GetUpload(ctx, upload.UploadID) + if err != nil { + s.portal.Logger().Error("Could not get upload", zap.Error(err)) + return err + } + + reader, err := tusUpload.GetReader(ctx) + if err != nil { + s.portal.Logger().Error("Could not get tus file", zap.Error(err)) + return err + } + + hash, err := s.GetHash(reader) + + if err != nil { + s.portal.Logger().Error("Could not compute hash", zap.Error(err)) + return err + } + + dbHash, err := hex.DecodeString(upload.Hash) + + if err != nil { + s.portal.Logger().Error("Could not decode hash", zap.Error(err)) + return err + } + + if !bytes.Equal(hash, dbHash) { + s.portal.Logger().Error("Hashes do not match", zap.Any("upload", upload), zap.Any("hash", hash), zap.Any("dbHash", dbHash)) + return err + } + + err = s.PutFile(reader, s.portal.Config().GetString("core.storage.s3.bucket"), dbHash) + + return nil + }, upload) + + return job, task +} + +func (s *StorageServiceImpl) getPrefixedHash(hash []byte) []byte { + return append([]byte{byte(types.HashTypeBlake3)}, hash...) +}