From 93e727ab3b67e31897b9d973f7101ea1c0ad455c Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Fri, 16 Feb 2024 22:00:53 -0500 Subject: [PATCH] refactor: epic protocol and storage design refactor --- api/s5/file.go | 234 ++++++++ api/s5/s5.go | 210 ++++--- {storage => protocols/s5}/locker.go | 10 +- protocols/s5/s5.go | 66 ++- protocols/s5/tus.go | 444 +++++++++++++++ storage/file.go | 210 ------- storage/storage.go | 822 +++++++--------------------- 7 files changed, 1025 insertions(+), 971 deletions(-) create mode 100644 api/s5/file.go rename {storage => protocols/s5}/locker.go (97%) create mode 100644 protocols/s5/tus.go delete mode 100644 storage/file.go diff --git a/api/s5/file.go b/api/s5/file.go new file mode 100644 index 0000000..a590d6a --- /dev/null +++ b/api/s5/file.go @@ -0,0 +1,234 @@ +package s5 + +import ( + "context" + "encoding/hex" + "errors" + "io" + "time" + + "git.lumeweb.com/LumeWeb/portal/protocols/s5" + + "git.lumeweb.com/LumeWeb/portal/metadata" + + "git.lumeweb.com/LumeWeb/portal/storage" + + "git.lumeweb.com/LumeWeb/libs5-go/encoding" + "git.lumeweb.com/LumeWeb/libs5-go/types" +) + +var _ io.ReadSeekCloser = (*S5File)(nil) + +type S5File struct { + reader io.ReadCloser + hash []byte + storage storage.StorageService + metadata metadata.MetadataService + record *metadata.UploadMetadata + protocol *s5.S5Protocol + cid *encoding.CID + read bool + tus *s5.TusHandler +} + +type FileParams struct { + Storage storage.StorageService + Metadata metadata.MetadataService + Hash []byte + Protocol *s5.S5Protocol + Tus *s5.TusHandler +} + +func NewFile(params FileParams) *S5File { + return &S5File{ + storage: params.Storage, + metadata: params.Metadata, + hash: params.Hash, + protocol: params.Protocol, + tus: params.Tus, + } +} + +func (f *S5File) Exists() bool { + _, err := f.metadata.GetUpload(context.Background(), f.hash) + + if err != nil { + return false + } + + return true +} + +func (f *S5File) Read(p []byte) (n int, err error) { + err = f.init(0) + if err != nil { + return 0, err + } + f.read = true + + return f.reader.Read(p) +} + +func (f *S5File) Seek(offset int64, whence int) (int64, error) { + switch whence { + case io.SeekStart: + if !f.read { + return 0, nil + } + + if f.reader != nil { + err := f.reader.Close() + if err != nil { + return 0, err + } + f.reader = nil + } + err := f.init(offset) + if err != nil { + return 0, err + } + case io.SeekCurrent: + return 0, errors.New("not supported") + case io.SeekEnd: + return int64(f.Size()), nil + default: + return 0, errors.New("invalid whence") + } + + return 0, nil +} + +func (f *S5File) Close() error { + if f.reader != nil { + r := f.reader + f.reader = nil + return r.Close() + } + + return nil +} + +func (f *S5File) init(offset int64) error { + if f.reader == nil { + + reader, err := f.tus.GetTusUploadReader(f.hash, offset) + + if err == nil { + f.reader = reader + f.read = false + return nil + } + + reader, err = f.storage.DownloadObject(context.Background(), f.StorageProtocol(), f.hash, offset) + if err != nil { + return err + } + + f.reader = reader + f.read = false + } + + return nil +} + +func (f *S5File) Record() (*metadata.UploadMetadata, error) { + if f.record == nil { + record, err := f.metadata.GetUpload(context.Background(), f.Hash()) + + if err != nil { + return nil, errors.New("file does not exist") + } + + f.record = &record + } + + return f.record, nil +} + +func (f *S5File) Hash() []byte { + hashStr := f.HashString() + + if hashStr == "" { + return nil + } + + str, err := hex.DecodeString(hashStr) + if err != nil { + return nil + } + + return str +} + +func (f *S5File) HashString() string { + record, err := f.Record() + if err != nil { + return "" + } + + return hex.EncodeToString(record.Hash) +} + +func (f *S5File) Name() string { + cid, _ := f.CID().ToString() + + return cid +} + +func (f *S5File) Modtime() time.Time { + record, err := f.Record() + if err != nil { + return time.Unix(0, 0) + } + + return record.Created +} +func (f *S5File) Size() uint64 { + record, err := f.Record() + if err != nil { + return 0 + } + + return record.Size +} +func (f *S5File) CID() *encoding.CID { + if f.cid == nil { + multihash := encoding.MultihashFromBytes(f.Hash(), types.HashTypeBlake3) + cid := encoding.NewCID(types.CIDTypeRaw, *multihash, f.Size()) + f.cid = cid + } + return f.cid +} + +func (f *S5File) Mime() string { + record, err := f.Record() + if err != nil { + return "" + } + + return record.MimeType +} + +func (f *S5File) StorageProtocol() storage.StorageProtocol { + return s5.GetStorageProtocol(f.protocol) +} + +func (f *S5File) Proof() ([]byte, error) { + object, err := f.storage.DownloadObjectProof(context.Background(), f.StorageProtocol(), f.hash) + + if err != nil { + return nil, err + } + + proof, err := io.ReadAll(object) + if err != nil { + return nil, err + } + + err = object.Close() + if err != nil { + return nil, err + } + + return proof, nil +} diff --git a/api/s5/s5.go b/api/s5/s5.go index d027c77..b62f0a2 100644 --- a/api/s5/s5.go +++ b/api/s5/s5.go @@ -21,8 +21,14 @@ import ( "strings" "time" + "git.lumeweb.com/LumeWeb/portal/metadata" + + "git.lumeweb.com/LumeWeb/portal/storage" + + "github.com/getkin/kin-openapi/openapi3" + "git.lumeweb.com/LumeWeb/libs5-go/encoding" - "git.lumeweb.com/LumeWeb/libs5-go/metadata" + s5libmetadata "git.lumeweb.com/LumeWeb/libs5-go/metadata" "git.lumeweb.com/LumeWeb/libs5-go/node" "git.lumeweb.com/LumeWeb/libs5-go/protocol" "git.lumeweb.com/LumeWeb/libs5-go/service" @@ -43,8 +49,6 @@ import ( "git.lumeweb.com/LumeWeb/portal/api/registry" protoRegistry "git.lumeweb.com/LumeWeb/portal/protocols/registry" "git.lumeweb.com/LumeWeb/portal/protocols/s5" - "git.lumeweb.com/LumeWeb/portal/storage" - "github.com/getkin/kin-openapi/openapi3" "github.com/rs/cors" "github.com/spf13/viper" "go.sia.tech/jape" @@ -64,26 +68,28 @@ var spec []byte var swagfs embed.FS type S5API struct { - config *viper.Viper - identity ed25519.PrivateKey - accounts *account.AccountServiceDefault - storage *storage.StorageServiceDefault - db *gorm.DB - protocols []protoRegistry.Protocol - httpHandler HttpHandler - protocol *s5.S5Protocol - logger *zap.Logger + config *viper.Viper + identity ed25519.PrivateKey + accounts *account.AccountServiceDefault + storage storage.StorageService + metadata metadata.MetadataService + db *gorm.DB + protocols []protoRegistry.Protocol + protocol *s5.S5Protocol + logger *zap.Logger + tusHandler *s5.TusHandler } type APIParams struct { fx.In - Config *viper.Viper - Identity ed25519.PrivateKey - Accounts *account.AccountServiceDefault - Storage *storage.StorageServiceDefault - Db *gorm.DB - Protocols []protoRegistry.Protocol `group:"protocol"` - Logger *zap.Logger + Config *viper.Viper + Identity ed25519.PrivateKey + Accounts *account.AccountServiceDefault + Storage storage.StorageService + Db *gorm.DB + Protocols []protoRegistry.Protocol `group:"protocol"` + Logger *zap.Logger + TusHandler *s5.TusHandler } type S5ApiResult struct { @@ -94,13 +100,14 @@ type S5ApiResult struct { func NewS5(params APIParams) (S5ApiResult, error) { api := &S5API{ - config: params.Config, - identity: params.Identity, - accounts: params.Accounts, - storage: params.Storage, - db: params.Db, - protocols: params.Protocols, - logger: params.Logger, + config: params.Config, + identity: params.Identity, + accounts: params.Accounts, + storage: params.Storage, + db: params.Db, + protocols: params.Protocols, + logger: params.Logger, + tusHandler: params.TusHandler, } return S5ApiResult{ API: api, @@ -146,7 +153,7 @@ func (s *S5API) Routes() *httprouter.Router { authMw := authMiddleware(authMiddlewareOpts) - tusHandler := BuildS5TusApi(authMw, s.storage) + tusHandler := BuildS5TusApi(authMw, s.tusHandler) tusOptionsHandler := func(c jape.Context) { c.ResponseWriter.WriteHeader(http.StatusOK) @@ -291,10 +298,10 @@ func BuildTusCors() func(h http.Handler) http.Handler { return mw.Handler } -func BuildS5TusApi(authMw middleware.HttpMiddlewareFunc, storage *storage.StorageServiceDefault) jape.Handler { +func BuildS5TusApi(authMw middleware.HttpMiddlewareFunc, handler *s5.TusHandler) jape.Handler { // Create a jape.Handler for your tusHandler tusJapeHandler := func(c jape.Context) { - tusHandler := storage.Tus() + tusHandler := handler.Tus() tusHandler.ServeHTTP(c.ResponseWriter, c.Request) } @@ -334,45 +341,6 @@ func (rsnc readSeekNopCloser) Close() error { return nil } -type HttpHandler struct { - config *viper.Viper - logger *zap.Logger - storage *storage.StorageServiceDefault - db *gorm.DB - accounts *account.AccountServiceDefault - protocol *s5.S5Protocol -} - -type HttpHandlerParams struct { - fx.In - - Config *viper.Viper - Logger *zap.Logger - Storage *storage.StorageServiceDefault - Db *gorm.DB - Accounts *account.AccountServiceDefault - Protocol *s5.S5Protocol -} - -type HttpHandlerResult struct { - fx.Out - - HttpHandler HttpHandler -} - -func NewHttpHandler(params HttpHandlerParams) (HttpHandlerResult, error) { - return HttpHandlerResult{ - HttpHandler: HttpHandler{ - config: params.Config, - logger: params.Logger, - storage: params.Storage, - db: params.Db, - accounts: params.Accounts, - protocol: params.Protocol, - }, - }, nil -} - func (s *S5API) smallFileUpload(jc jape.Context) { user := middleware.GetUserFromContext(jc.Request.Context()) @@ -388,8 +356,18 @@ func (s *S5API) smallFileUpload(jc jape.Context) { } }(file) - // Use PutFileSmall for the actual file upload - newUpload, err2 := s.storage.PutFileSmall(file, "s5", user, jc.Request.RemoteAddr) + newUpload, err2 := s.storage.UploadObject(jc.Request.Context(), s5.GetStorageProtocol(s.protocol), file, nil, nil) + + if err2 != nil { + s.sendErrorResponse(jc, NewS5Error(ErrKeyFileUploadFailed, err2)) + return + } + + newUpload.UserID = user + newUpload.UploaderIP = jc.Request.RemoteAddr + + err2 = s.metadata.SaveUpload(jc.Request.Context(), *newUpload) + if err2 != nil { s.sendErrorResponse(jc, NewS5Error(ErrKeyFileUploadFailed, err2)) return @@ -721,7 +699,6 @@ func (s *S5API) accountStats(jc jape.Context) { func (s *S5API) accountPins(jc jape.Context) { var cursor uint64 if err := jc.DecodeForm("cursor", &cursor); err != nil { - // Assuming jc.DecodeForm sends out its own error, so no need for further action here return } @@ -784,10 +761,7 @@ func (s *S5API) accountPin(jc jape.Context) { return } - hash := hex.EncodeToString(decodedCid.Hash.HashBytes()) - s.logger.Info("Processing pin request", zap.String("cid", cid), zap.String("hash", hash)) - - if err := s.accounts.PinByHash(hash, userID); err != nil { + if err := s.accounts.PinByHash(decodedCid.Hash.HashBytes(), userID); err != nil { s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, err)) return } @@ -818,32 +792,32 @@ func (s *S5API) directoryUpload(jc jape.Context) { return } - user := middleware.GetUserFromContext(jc.Request.Context()) - uploads, err := s.processMultipartFiles(jc.Request, user) + uploads, err := s.processMultipartFiles(jc.Request) if err != nil { - s.sendErrorResponse(jc, err) // processMultipartFiles should return a properly wrapped S5Error + s.sendErrorResponse(jc, err) return } // Generate metadata for the directory upload app, err := s.createAppMetadata(name, tryFiles, errorPages, uploads) if err != nil { - s.sendErrorResponse(jc, err) // createAppMetadata should return a properly wrapped S5Error + s.sendErrorResponse(jc, err) return } // Upload the metadata - cidStr, err := s.uploadAppMetadata(app, user, jc.Request) + cidStr, err := s.uploadAppMetadata(app, jc.Request) if err != nil { - s.sendErrorResponse(jc, err) // uploadAppMetadata should return a properly wrapped S5Error + s.sendErrorResponse(jc, err) return } jc.Encode(&AppUploadResponse{CID: cidStr}) } -func (s *S5API) processMultipartFiles(r *http.Request, user uint) (map[string]*models.Upload, error) { - uploadMap := make(map[string]*models.Upload) +func (s *S5API) processMultipartFiles(r *http.Request) (map[string]*metadata.UploadMetadata, error) { + uploadMap := make(map[string]*metadata.UploadMetadata) + user := middleware.GetUserFromContext(r.Context()) for _, files := range r.MultipartForm.File { for _, fileHeader := range files { @@ -858,7 +832,15 @@ func (s *S5API) processMultipartFiles(r *http.Request, user uint) (map[string]*m } }(file) - upload, err := s.storage.PutFileSmall(file, "s5", user, r.RemoteAddr) + upload, err := s.storage.UploadObject(r.Context(), s5.GetStorageProtocol(s.protocol), file, nil, nil) + if err != nil { + return nil, NewS5Error(ErrKeyStorageOperationFailed, err) + } + + upload.UserID = user + upload.UploaderIP = r.RemoteAddr + + err = s.metadata.SaveUpload(r.Context(), *upload) if err != nil { return nil, NewS5Error(ErrKeyStorageOperationFailed, err) } @@ -870,20 +852,17 @@ func (s *S5API) processMultipartFiles(r *http.Request, user uint) (map[string]*m return uploadMap, nil } -func (s *S5API) createAppMetadata(name string, tryFiles []string, errorPages map[int]string, uploads map[string]*models.Upload) (*metadata.WebAppMetadata, error) { - filesMap := make(map[string]metadata.WebAppMetadataFileReference, len(uploads)) +func (s *S5API) createAppMetadata(name string, tryFiles []string, errorPages map[int]string, uploads map[string]*metadata.UploadMetadata) (*s5libmetadata.WebAppMetadata, error) { + filesMap := make(map[string]s5libmetadata.WebAppMetadataFileReference, len(uploads)) for filename, upload := range uploads { - hashDecoded, err := hex.DecodeString(upload.Hash) - if err != nil { - return nil, NewS5Error(ErrKeyInternalError, err, "Failed to decode hash for file: "+filename) - } + hash := upload.Hash - cid, err := encoding.CIDFromHash(hashDecoded, upload.Size, types.CIDTypeRaw, types.HashTypeBlake3) + cid, err := encoding.CIDFromHash(hash, upload.Size, types.CIDTypeRaw, types.HashTypeBlake3) if err != nil { return nil, NewS5Error(ErrKeyInternalError, err, "Failed to create CID for file: "+filename) } - filesMap[filename] = metadata.WebAppMetadataFileReference{ + filesMap[filename] = s5libmetadata.WebAppMetadataFileReference{ Cid: cid, ContentType: upload.MimeType, } @@ -894,9 +873,9 @@ func (s *S5API) createAppMetadata(name string, tryFiles []string, errorPages map extraMetadataMap[statusCode] = page } - extraMetadata := metadata.NewExtraMetadata(extraMetadataMap) + extraMetadata := s5libmetadata.NewExtraMetadata(extraMetadataMap) // Create the web app metadata object - app := metadata.NewWebAppMetadata( + app := s5libmetadata.NewWebAppMetadata( name, tryFiles, *extraMetadata, @@ -907,27 +886,37 @@ func (s *S5API) createAppMetadata(name string, tryFiles []string, errorPages map return app, nil } -func (s *S5API) uploadAppMetadata(appData *metadata.WebAppMetadata, userId uint, r *http.Request) (string, error) { +func (s *S5API) uploadAppMetadata(appData *s5libmetadata.WebAppMetadata, r *http.Request) (string, *S5Error) { + userId := middleware.GetUserFromContext(r.Context()) + appDataRaw, err := msgpack.Marshal(appData) if err != nil { - return "", NewS5Error(ErrKeyInternalError, err, "Failed to marshal app metadata") + return "", NewS5Error(ErrKeyInternalError, err, "Failed to marshal app s5libmetadata") } file := bytes.NewReader(appDataRaw) - upload, err := s.storage.PutFileSmall(file, "s5", userId, r.RemoteAddr) + upload, err := s.storage.UploadObject(r.Context(), s5.GetStorageProtocol(s.protocol), file, nil, nil) if err != nil { return "", NewS5Error(ErrKeyStorageOperationFailed, err) } - // Construct the CID for the newly uploaded metadata + upload.UserID = userId + upload.UploaderIP = r.RemoteAddr + + err = s.metadata.SaveUpload(r.Context(), *upload) + if err != nil { + return "", NewS5Error(ErrKeyStorageOperationFailed, err) + } + + // Construct the CID for the newly uploaded s5libmetadata cid, err := encoding.CIDFromHash(upload.Hash, uint64(len(appDataRaw)), types.CIDTypeMetadataWebapp, types.HashTypeBlake3) if err != nil { - return "", NewS5Error(ErrKeyInternalError, err, "Failed to create CID for new app metadata") + return "", NewS5Error(ErrKeyInternalError, err, "Failed to create CID for new app s5libmetadata") } cidStr, err := cid.ToString() if err != nil { - return "", NewS5Error(ErrKeyInternalError, err, "Failed to convert CID to string for new app metadata") + return "", NewS5Error(ErrKeyInternalError, err, "Failed to convert CID to string for new app s5libmetadata") } return cidStr, nil @@ -1294,7 +1283,7 @@ func (s *S5API) downloadMetadata(jc jape.Context) { switch cidDecoded.Type { case types.CIDTypeRaw: - _ = jc.Error(errors.New("Raw CIDs do not have metadata"), http.StatusBadRequest) + _ = jc.Error(errors.New("Raw CIDs do not have s5libmetadata"), http.StatusBadRequest) return case types.CIDTypeResolver: @@ -1304,8 +1293,8 @@ func (s *S5API) downloadMetadata(jc jape.Context) { meta, err := s.getNode().Services().Storage().GetMetadataByCID(cidDecoded) - if jc.Check("error getting metadata", err) != nil { - s.logger.Error("error getting metadata", zap.Error(err)) + if jc.Check("error getting s5libmetadata", err) != nil { + s.logger.Error("error getting s5libmetadata", zap.Error(err)) return } @@ -1329,9 +1318,9 @@ func (s *S5API) downloadFile(jc jape.Context) { var hashBytes []byte isProof := false - if strings.HasSuffix(cid, ".obao") { + if strings.HasSuffix(cid, storage.PROOF_EXTENSION) { isProof = true - cid = strings.TrimSuffix(cid, ".obao") + cid = strings.TrimSuffix(cid, storage.PROOF_EXTENSION) } cidDecoded, err := encoding.CIDFromString(cid) @@ -1348,7 +1337,7 @@ func (s *S5API) downloadFile(jc jape.Context) { hashBytes = cidDecoded.Hash.HashBytes() } - file := s.storage.NewFile(hashBytes) + file := s.newFile(s.protocol, hashBytes) if !file.Exists() { jc.ResponseWriter.WriteHeader(http.StatusNotFound) @@ -1413,6 +1402,15 @@ func (s *S5API) newStorageLocationProvider(hash *encoding.Multihash, types ...ty }) } +func (s *S5API) newFile(protocol *s5.S5Protocol, hash []byte) *S5File { + return NewFile(FileParams{ + Protocol: protocol, + Hash: hash, + Metadata: s.metadata, + Storage: s.storage, + }) +} + func setAuthCookie(jwt string, jc jape.Context) { authCookie := http.Cookie{ Name: "s5-auth-token", diff --git a/storage/locker.go b/protocols/s5/locker.go similarity index 97% rename from storage/locker.go rename to protocols/s5/locker.go index 20e5aea..9235251 100644 --- a/storage/locker.go +++ b/protocols/s5/locker.go @@ -1,14 +1,15 @@ -package storage +package s5 import ( "context" + "os" + "sync" + "time" + "git.lumeweb.com/LumeWeb/portal/db/models" tusd "github.com/tus/tusd/v2/pkg/handler" "go.uber.org/zap" "gorm.io/gorm" - "os" - "sync" - "time" ) var ( @@ -17,7 +18,6 @@ var ( ) type MySQLLocker struct { - storage *StorageServiceDefault AcquirerPollInterval time.Duration HolderPollInterval time.Duration db *gorm.DB diff --git a/protocols/s5/s5.go b/protocols/s5/s5.go index f64d0b4..cc73f8b 100644 --- a/protocols/s5/s5.go +++ b/protocols/s5/s5.go @@ -3,7 +3,14 @@ package s5 import ( "context" "crypto/ed25519" + "errors" "fmt" + "time" + + "git.lumeweb.com/LumeWeb/portal/metadata" + + "git.lumeweb.com/LumeWeb/portal/storage" + s5config "git.lumeweb.com/LumeWeb/libs5-go/config" s5ed "git.lumeweb.com/LumeWeb/libs5-go/ed25519" "git.lumeweb.com/LumeWeb/libs5-go/encoding" @@ -12,12 +19,10 @@ import ( s5storage "git.lumeweb.com/LumeWeb/libs5-go/storage" "git.lumeweb.com/LumeWeb/libs5-go/types" "git.lumeweb.com/LumeWeb/portal/protocols/registry" - "git.lumeweb.com/LumeWeb/portal/storage" "github.com/spf13/viper" bolt "go.etcd.io/bbolt" "go.uber.org/fx" "go.uber.org/zap" - "time" ) var ( @@ -26,20 +31,22 @@ var ( ) type S5Protocol struct { - config *viper.Viper - logger *zap.Logger - storage *storage.StorageServiceDefault - identity ed25519.PrivateKey - node *s5node.Node + config *viper.Viper + logger *zap.Logger + storage storage.StorageService + identity ed25519.PrivateKey + node *s5node.Node + tusHandler *TusHandler } type S5ProtocolParams struct { fx.In Config *viper.Viper Logger *zap.Logger - Storage *storage.StorageServiceDefault + Storage storage.StorageService Identity ed25519.PrivateKey ProviderStore *S5ProviderStore + TusHandler *TusHandler } type S5ProtocolResult struct { @@ -50,8 +57,17 @@ type S5ProtocolResult struct { Db *bolt.DB } +type S5ProviderStoreParams struct { + fx.In + Config *viper.Viper + Metadata metadata.MetadataService + Logger *zap.Logger + Tus *TusHandler +} + var ProtocolModule = fx.Module("s5_api", fx.Provide(NewS5Protocol), + fx.Provide(NewTusHandler), fx.Provide(NewS5ProviderStore), fx.Replace(func(cfg *s5config.NodeConfig) *zap.Logger { return cfg.Logger @@ -63,10 +79,11 @@ func NewS5Protocol( params S5ProtocolParams, ) (S5ProtocolResult, error) { proto := &S5Protocol{ - config: params.Config, - logger: params.Logger, - storage: params.Storage, - identity: params.Identity, + config: params.Config, + logger: params.Logger, + storage: params.Storage, + identity: params.Identity, + tusHandler: params.TusHandler, } cfg, err := ConfigureS5Protocol(params) @@ -136,11 +153,12 @@ func ConfigureS5Protocol(params S5ProtocolParams) (*s5config.NodeConfig, error) return cfg, nil } -func NewS5ProviderStore(config *viper.Viper, logger *zap.Logger, storage *storage.StorageServiceDefault) *S5ProviderStore { +func NewS5ProviderStore(params S5ProviderStoreParams) *S5ProviderStore { return &S5ProviderStore{ - config: config, - logger: logger, - storage: storage, + config: params.Config, + logger: params.Logger, + tus: params.Tus, + metadata: params.Metadata, } } @@ -165,6 +183,11 @@ func (s *S5Protocol) Init(args ...any) error { return err } + err = s.tusHandler.Init() + if err != nil { + return err + } + return nil } func (s *S5Protocol) Start(ctx context.Context) error { @@ -197,9 +220,10 @@ func (s *S5Protocol) Stop(ctx context.Context) error { } type S5ProviderStore struct { - config *viper.Viper - logger *zap.Logger - storage *storage.StorageServiceDefault + config *viper.Viper + logger *zap.Logger + tus *TusHandler + metadata metadata.MetadataService } func (s S5ProviderStore) CanProvide(hash *encoding.Multihash, kind []types.StorageLocationType) bool { @@ -208,13 +232,13 @@ func (s S5ProviderStore) CanProvide(hash *encoding.Multihash, kind []types.Stora case types.StorageLocationTypeArchive, types.StorageLocationTypeFile, types.StorageLocationTypeFull: rawHash := hash.HashBytes() - if exists, upload := s.storage.TusUploadExists(rawHash); exists { + if exists, upload := s.tus.TusUploadExists(rawHash); exists { if upload.Completed { return true } } - if exists, _ := s.storage.FileExists(rawHash); exists { + if _, err := s.metadata.GetUpload(context.Background(), rawHash); errors.Is(err, metadata.ErrNotFound) { return true } } diff --git a/protocols/s5/tus.go b/protocols/s5/tus.go new file mode 100644 index 0000000..3524be7 --- /dev/null +++ b/protocols/s5/tus.go @@ -0,0 +1,444 @@ +package s5 + +import ( + "bytes" + "context" + "encoding/hex" + "errors" + "fmt" + "io" + "strings" + "time" + + "git.lumeweb.com/LumeWeb/portal/account" + + "github.com/spf13/viper" + + "git.lumeweb.com/LumeWeb/portal/metadata" + + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/tus/tusd/v2/pkg/s3store" + + tusd "github.com/tus/tusd/v2/pkg/handler" + + "git.lumeweb.com/LumeWeb/portal/storage" + "gorm.io/gorm" + + "git.lumeweb.com/LumeWeb/libs5-go/encoding" + "git.lumeweb.com/LumeWeb/portal/cron" + "git.lumeweb.com/LumeWeb/portal/db/models" + "git.lumeweb.com/LumeWeb/portal/renter" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/google/uuid" + "go.uber.org/zap" +) + +var ( + _ cron.CronableService = (*TusHandler)(nil) +) + +type TusHandler struct { + config *viper.Viper + db *gorm.DB + logger *zap.Logger + cron *cron.CronServiceDefault + storage storage.StorageService + accounts *account.AccountServiceDefault + metadata metadata.MetadataService + tus *tusd.Handler + tusStore tusd.DataStore + s3Client *s3.Client + protocol *S5Protocol +} + +type TusHandlerParams struct { + Config *viper.Viper + Logger *zap.Logger + Db *gorm.DB + Cron *cron.CronServiceDefault + Storage storage.StorageService + Accounts *account.AccountServiceDefault + Metadata metadata.MetadataService + Protocol *S5Protocol +} + +func NewTusHandler(params TusHandlerParams) *TusHandler { + return &TusHandler{ + config: params.Config, + db: params.Db, + logger: params.Logger, + cron: params.Cron, + storage: params.Storage, + accounts: params.Accounts, + metadata: params.Metadata, + } +} + +func (t *TusHandler) Init() error { + preUpload := func(hook tusd.HookEvent) (tusd.HTTPResponse, tusd.FileInfoChanges, error) { + blankResp := tusd.HTTPResponse{} + blankChanges := tusd.FileInfoChanges{} + + hash, ok := hook.Upload.MetaData["hash"] + if !ok { + return blankResp, blankChanges, errors.New("missing hash") + } + + decodedHash, err := encoding.MultihashFromBase64Url(hash) + + if err != nil { + return blankResp, blankChanges, err + } + + upload, err := t.metadata.GetUpload(hook.Context, decodedHash.HashBytes()) + + if !upload.IsEmpty() { + if err != nil && !errors.Is(err, metadata.ErrNotFound) { + return blankResp, blankChanges, err + } + return blankResp, blankChanges, errors.New("file already exists") + } + + exists, _ := t.TusUploadExists(decodedHash.HashBytes()) + + if exists { + return blankResp, blankChanges, errors.New("file is already being uploaded") + } + + return blankResp, blankChanges, nil + } + + customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) { + if service == s3.ServiceID { + return aws.Endpoint{ + URL: t.config.GetString("core.storage.s3.endpoint"), + SigningRegion: t.config.GetString("core.storage.s3.region"), + }, nil + } + return aws.Endpoint{}, &aws.EndpointNotFoundError{} + }) + + cfg, err := config.LoadDefaultConfig(context.TODO(), + config.WithRegion("us-east-1"), + config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider( + t.config.GetString("core.storage.s3.accessKey"), + t.config.GetString("core.storage.s3.secretKey"), + "", + )), + config.WithEndpointResolverWithOptions(customResolver), + ) + if err != nil { + return err + } + + s3Client := s3.NewFromConfig(cfg) + + store := s3store.New(t.config.GetString("core.storage.s3.bufferBucket"), s3Client) + + locker := NewMySQLLocker(t.db, t.logger) + + composer := tusd.NewStoreComposer() + store.UseIn(composer) + composer.UseLocker(locker) + + handler, err := tusd.NewHandler(tusd.Config{ + BasePath: "/s5/upload/tus", + StoreComposer: composer, + DisableDownload: true, + NotifyCompleteUploads: true, + NotifyTerminatedUploads: true, + NotifyCreatedUploads: true, + RespectForwardedHeaders: true, + PreUploadCreateCallback: preUpload, + }) + + if err != nil { + return err + } + + t.tus = handler + t.tusStore = store + t.s3Client = s3Client + + t.cron.RegisterService(t) + return nil +} + +func (t *TusHandler) LoadInitialTasks(cron cron.CronService) error { + return nil +} + +func (t *TusHandler) Tus() *tusd.Handler { + return t.tus +} + +func (t *TusHandler) TusUploadExists(hash []byte) (bool, models.TusUpload) { + hashStr := hex.EncodeToString(hash) + + var upload models.TusUpload + result := t.db.Model(&models.TusUpload{}).Where(&models.TusUpload{Hash: hashStr}).First(&upload) + + return result.RowsAffected > 0, upload +} + +func (t *TusHandler) CreateTusUpload(hash []byte, uploadID string, uploaderID uint, uploaderIP string, protocol string) (*models.TusUpload, error) { + hashStr := hex.EncodeToString(hash) + + upload := &models.TusUpload{ + Hash: hashStr, + UploadID: uploadID, + UploaderID: uploaderID, + UploaderIP: uploaderIP, + Uploader: models.User{}, + Protocol: protocol, + } + + result := t.db.Create(upload) + + if result.Error != nil { + return nil, result.Error + } + + return upload, nil +} +func (t *TusHandler) TusUploadProgress(uploadID string) error { + + find := &models.TusUpload{UploadID: uploadID} + + var upload models.TusUpload + result := t.db.Model(&models.TusUpload{}).Where(find).First(&upload) + + if result.RowsAffected == 0 { + return errors.New("upload not found") + } + + result = t.db.Model(&models.TusUpload{}).Where(find).Update("updated_at", time.Now()) + + if result.Error != nil { + return result.Error + } + + return nil +} +func (t *TusHandler) TusUploadCompleted(uploadID string) error { + + find := &models.TusUpload{UploadID: uploadID} + + var upload models.TusUpload + result := t.db.Model(&models.TusUpload{}).Where(find).First(&upload) + + if result.RowsAffected == 0 { + return errors.New("upload not found") + } + + result = t.db.Model(&models.TusUpload{}).Where(find).Update("completed", true) + + return nil +} +func (t *TusHandler) DeleteTusUpload(uploadID string) error { + result := t.db.Where(&models.TusUpload{UploadID: uploadID}).Delete(&models.TusUpload{}) + + if result.Error != nil { + return result.Error + } + + return nil +} + +func (t *TusHandler) ScheduleTusUpload(uploadID string) error { + find := &models.TusUpload{UploadID: uploadID} + + var upload models.TusUpload + result := t.db.Model(&models.TusUpload{}).Where(find).First(&upload) + + if result.RowsAffected == 0 { + return errors.New("upload not found") + } + + task := t.cron.RetryableTask(cron.RetryableTaskParams{ + Name: "tusUpload", + Function: t.tusUploadTask, + Args: []interface{}{&upload}, + Attempt: 0, + Limit: 0, + After: func(jobID uuid.UUID, jobName string) { + t.logger.Info("Job finished", zap.String("jobName", jobName), zap.String("uploadID", uploadID)) + err := t.DeleteTusUpload(uploadID) + if err != nil { + t.logger.Error("Error deleting tus upload", zap.Error(err)) + } + }, + }) + + _, err := t.cron.CreateJob(task) + + if err != nil { + return err + } + return nil +} + +func (t *TusHandler) GetTusUploadReader(hash []byte, start int64) (io.ReadCloser, error) { + ctx := context.Background() + exists, upload := t.TusUploadExists(hash) + + if !exists { + return nil, metadata.ErrNotFound + } + + meta, err := t.tusStore.GetUpload(ctx, upload.UploadID) + if err != nil { + return nil, err + } + + info, err := meta.GetInfo(ctx) + if err != nil { + return nil, err + } + + if start > 0 { + endPosition := start + info.Size - 1 + rangeHeader := fmt.Sprintf("bytes=%d-%d", start, endPosition) + ctx = context.WithValue(ctx, "range", rangeHeader) + } + + reader, err := meta.GetReader(ctx) + if err != nil { + return nil, err + } + + return reader, nil +} + +func (t *TusHandler) tusUploadTask(upload *models.TusUpload) error { + ctx := context.Background() + tusUpload, err := t.tusStore.GetUpload(ctx, upload.UploadID) + if err != nil { + t.logger.Error("Could not get upload", zap.Error(err)) + return err + } + + readers := make([]io.ReadCloser, 0) + getReader := func() (io.Reader, error) { + muReader, err := tusUpload.GetReader(ctx) + if err != nil { + return nil, err + } + + readers = append(readers, muReader) + return muReader, nil + } + + defer func() { + for _, reader := range readers { + err := reader.Close() + if err != nil { + t.logger.Error("error closing reader", zap.Error(err)) + } + } + }() + + reader, err := getReader() + if err != nil { + t.logger.Error("Could not get tus file", zap.Error(err)) + return err + } + + proof, err := t.storage.HashObject(ctx, reader) + + if err != nil { + t.logger.Error("Could not compute proof", zap.Error(err)) + return err + } + + dbHash, err := hex.DecodeString(upload.Hash) + + if err != nil { + t.logger.Error("Could not decode proof", zap.Error(err)) + return err + } + + if !bytes.Equal(proof.Hash, dbHash) { + t.logger.Error("Hashes do not match", zap.Any("upload", upload), zap.Any("proof", proof), zap.Any("dbHash", dbHash)) + return err + } + + info, err := tusUpload.GetInfo(context.Background()) + if err != nil { + t.logger.Error("Could not get tus info", zap.Error(err)) + return err + } + + storageProtocol := GetStorageProtocol(t.protocol) + + uploadMeta, err := t.storage.UploadObject(ctx, storageProtocol, nil, &renter.MultiPartUploadParams{ + ReaderFactory: func(start uint, end uint) (io.ReadCloser, error) { + rangeHeader := fmt.Sprintf("bytes=%d-%d", start, end) + ctx = context.WithValue(ctx, "range", rangeHeader) + return tusUpload.GetReader(ctx) + }, + Bucket: upload.Protocol, + FileName: "/" + storageProtocol.EncodeFileName(dbHash), + Size: uint64(info.Size), + }, proof) + + if err != nil { + t.logger.Error("Could not upload file", zap.Error(err)) + return err + } + + s3InfoId, _ := splitS3Ids(upload.UploadID) + + _, err = t.s3Client.DeleteObjects(ctx, &s3.DeleteObjectsInput{ + Bucket: aws.String(t.config.GetString("core.storage.s3.bufferBucket")), + Delete: &s3types.Delete{ + Objects: []s3types.ObjectIdentifier{ + { + Key: aws.String(s3InfoId), + }, + { + Key: aws.String(s3InfoId + ".info"), + }, + }, + Quiet: aws.Bool(true), + }, + }) + + if err != nil { + t.logger.Error("Could not delete upload metadata", zap.Error(err)) + return err + } + + uploadMeta.UserID = upload.UploaderID + uploadMeta.UploaderIP = upload.UploaderIP + + err = t.metadata.SaveUpload(ctx, *uploadMeta) + if err != nil { + t.logger.Error("Could not create upload", zap.Error(err)) + return err + } + + err = t.accounts.PinByHash(dbHash, upload.UploaderID) + if err != nil { + t.logger.Error("Could not pin upload", zap.Error(err)) + return err + } + + return nil +} +func splitS3Ids(id string) (objectId, multipartId string) { + index := strings.Index(id, "+") + if index == -1 { + return + } + + objectId = id[:index] + multipartId = id[index+1:] + return +} +func GetStorageProtocol(proto *S5Protocol) storage.StorageProtocol { + return interface{}(proto).(storage.StorageProtocol) +} diff --git a/storage/file.go b/storage/file.go deleted file mode 100644 index f61c4cc..0000000 --- a/storage/file.go +++ /dev/null @@ -1,210 +0,0 @@ -package storage - -import ( - "context" - "encoding/hex" - "errors" - "io" - "time" - - "git.lumeweb.com/LumeWeb/libs5-go/encoding" - "git.lumeweb.com/LumeWeb/libs5-go/types" - "git.lumeweb.com/LumeWeb/portal/db/models" - "git.lumeweb.com/LumeWeb/portal/renter" - "go.sia.tech/renterd/api" -) - -type FileImpl struct { - reader io.ReadCloser - hash []byte - storage *StorageServiceDefault - renter *renter.RenterDefault - record *models.Upload - cid *encoding.CID - read bool -} - -type FileParams struct { - Storage *StorageServiceDefault - Renter *renter.RenterDefault - Hash []byte -} - -func NewFile(params FileParams) *FileImpl { - return &FileImpl{hash: params.Hash, storage: params.Storage, renter: params.Renter, read: false} -} - -func (f *FileImpl) Exists() bool { - exists, _ := f.storage.FileExists(f.hash) - - return exists -} - -func (f *FileImpl) Read(p []byte) (n int, err error) { - err = f.init(0) - if err != nil { - return 0, err - } - f.read = true - - return f.reader.Read(p) -} - -func (f *FileImpl) Seek(offset int64, whence int) (int64, error) { - switch whence { - case io.SeekStart: - if !f.read { - return 0, nil - } - - if f.reader != nil { - err := f.reader.Close() - if err != nil { - return 0, err - } - f.reader = nil - } - err := f.init(offset) - if err != nil { - return 0, err - } - case io.SeekCurrent: - return 0, errors.New("not supported") - case io.SeekEnd: - return int64(f.Size()), nil - default: - return 0, errors.New("invalid whence") - } - - return 0, nil -} - -func (f *FileImpl) Close() error { - if f.reader != nil { - r := f.reader - f.reader = nil - return r.Close() - } - - return nil -} - -func (f *FileImpl) init(offset int64) error { - if f.reader == nil { - reader, _, err := f.storage.GetFile(f.hash, offset) - if err != nil { - return err - } - - f.reader = reader - f.read = false - } - - return nil -} - -func (f *FileImpl) Record() (*models.Upload, error) { - if f.record == nil { - exists, record := f.storage.FileExists(f.hash) - - if !exists { - return nil, errors.New("file does not exist") - } - - f.record = record - } - - return f.record, nil -} - -func (f *FileImpl) Hash() []byte { - hashStr := f.HashString() - - if hashStr == "" { - return nil - } - - str, err := hex.DecodeString(hashStr) - if err != nil { - return nil - } - - return str -} - -func (f *FileImpl) HashString() string { - record, err := f.Record() - if err != nil { - return "" - } - - return record.Hash -} - -func (f *FileImpl) Name() string { - cid, _ := f.CID().ToString() - - return cid -} - -func (f *FileImpl) Modtime() time.Time { - record, err := f.Record() - if err != nil { - return time.Unix(0, 0) - } - - return record.CreatedAt -} -func (f *FileImpl) Size() uint64 { - record, err := f.Record() - if err != nil { - return 0 - } - - return record.Size -} -func (f *FileImpl) CID() *encoding.CID { - if f.cid == nil { - multihash := encoding.MultihashFromBytes(f.Hash(), types.HashTypeBlake3) - cid := encoding.NewCID(types.CIDTypeRaw, *multihash, f.Size()) - f.cid = cid - } - return f.cid -} - -func (f *FileImpl) Mime() string { - record, err := f.Record() - if err != nil { - return "" - } - - return record.MimeType -} - -func (f *FileImpl) Protocol() string { - record, err := f.Record() - if err != nil { - return "" - } - - return record.Protocol -} -func (f *FileImpl) Proof() ([]byte, error) { - object, err := f.renter.GetObject(context.Background(), f.Protocol(), f.HashString(), api.DownloadObjectOptions{}) - - if err != nil { - return nil, err - } - - proof, err := io.ReadAll(object.Content) - if err != nil { - return nil, err - } - - err = object.Content.Close() - if err != nil { - return nil, err - } - - return proof, nil -} diff --git a/storage/storage.go b/storage/storage.go index 590bc35..a108bf4 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -3,685 +3,228 @@ package storage import ( "bytes" "context" - "encoding/hex" - "errors" "fmt" "io" "net/http" - "strings" - "time" - "git.lumeweb.com/LumeWeb/portal/api/middleware" - - "git.lumeweb.com/LumeWeb/libs5-go/encoding" - "git.lumeweb.com/LumeWeb/libs5-go/types" - "git.lumeweb.com/LumeWeb/portal/account" - "git.lumeweb.com/LumeWeb/portal/bao" - "git.lumeweb.com/LumeWeb/portal/cron" - "git.lumeweb.com/LumeWeb/portal/db/models" - "git.lumeweb.com/LumeWeb/portal/renter" - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/config" - "github.com/aws/aws-sdk-go-v2/credentials" - "github.com/aws/aws-sdk-go-v2/service/s3" - s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" - "github.com/google/uuid" - "github.com/spf13/viper" - tusd "github.com/tus/tusd/v2/pkg/handler" - "github.com/tus/tusd/v2/pkg/s3store" - "go.sia.tech/renterd/api" "go.uber.org/fx" + + "go.sia.tech/renterd/api" + + "git.lumeweb.com/LumeWeb/portal/metadata" + "go.uber.org/zap" + + "git.lumeweb.com/LumeWeb/portal/bao" + + "github.com/spf13/viper" "gorm.io/gorm" + + "git.lumeweb.com/LumeWeb/portal/renter" ) -type TusPreUploadCreateCallback func(hook tusd.HookEvent) (tusd.HTTPResponse, tusd.FileInfoChanges, error) -type TusPreFinishResponseCallback func(hook tusd.HookEvent) (tusd.HTTPResponse, error) +const PROOF_EXTENSION = ".obao" -type StorageServiceParams struct { - fx.In - Config *viper.Viper - Logger *zap.Logger - Db *gorm.DB - Accounts *account.AccountServiceDefault - Cron *cron.CronServiceDefault - Renter *renter.RenterDefault +var _ StorageService = (*StorageServiceDefault)(nil) + +type FileNameEncoderFunc func([]byte) string + +type StorageProtocol interface { + Name() string + EncodeFileName([]byte) string } var Module = fx.Module("storage", fx.Provide( NewStorageService, + fx.As(new(StorageService)), ), - fx.Invoke(func(s *StorageServiceDefault) error { - return s.init() - }), ) +type StorageService interface { + UploadObject(ctx context.Context, protocol StorageProtocol, data io.ReadSeeker, muParams *renter.MultiPartUploadParams, proof *bao.Result) (*metadata.UploadMetadata, error) + UploadObjectProof(ctx context.Context, protocol StorageProtocol, data io.ReadSeeker, proof *bao.Result) error + HashObject(ctx context.Context, data io.Reader) (*bao.Result, error) + DownloadObject(ctx context.Context, protocol StorageProtocol, objectHash []byte, start int64) (io.ReadCloser, error) + DownloadObjectProof(ctx context.Context, protocol StorageProtocol, objectHash []byte) (io.ReadCloser, error) + DeleteObject(ctx context.Context, protocol StorageProtocol, objectHash []byte) error + DeleteObjectProof(ctx context.Context, protocol StorageProtocol, objectHash []byte) error +} + type StorageServiceDefault struct { - tus *tusd.Handler - tusStore tusd.DataStore - s3Client *s3.Client config *viper.Viper - logger *zap.Logger db *gorm.DB - accounts *account.AccountServiceDefault - cron *cron.CronServiceDefault renter *renter.RenterDefault + logger *zap.Logger + metadata metadata.MetadataService } -func (s *StorageServiceDefault) Tus() *tusd.Handler { - return s.tus +type StorageServiceParams struct { + Config *viper.Viper + Db *gorm.DB + Renter *renter.RenterDefault + Logger *zap.Logger + metadata metadata.MetadataService } -func (s *StorageServiceDefault) Start() error { - return nil +func NewStorageService(params StorageServiceParams) *StorageServiceDefault { + return &StorageServiceDefault{ + config: params.Config, + db: params.Db, + renter: params.Renter, + } } -func NewStorageService(lc fx.Lifecycle, params StorageServiceParams) *StorageServiceDefault { - ss := &StorageServiceDefault{ - config: params.Config, - logger: params.Logger, - db: params.Db, - accounts: params.Accounts, - cron: params.Cron, - renter: params.Renter, - } - - lc.Append(fx.Hook{ - OnStart: func(ctx context.Context) error { - go ss.tusWorker() - return nil - }, - }) - - return ss -} - -func (s StorageServiceDefault) PutFileSmall(file io.ReadSeeker, bucket string, userId uint, userIp string) (*models.Upload, error) { - hashResult, len, err := s.GetHashSmall(file) - if err != nil { - return nil, err - } - - exists, upload := s.FileExists(hashResult.Hash) - if exists { - return upload, nil - } - - // Re-seek the file to the beginning after hashing - _, err = file.Seek(0, io.SeekStart) - if err != nil { - return nil, err - } - - hashStr, err := encoding.NewMultihash(s.getPrefixedHash(hashResult.Hash)).ToBase64Url() - if err != nil { - return nil, err - } - - _, err = file.Seek(0, io.SeekStart) - if err != nil { - return nil, err - } - - raw, err := io.ReadAll(file) - if err != nil { - return nil, err - } - - _, err = file.Seek(0, io.SeekStart) - if err != nil { - return nil, err - } - - mimeType := http.DetectContentType(raw) - - err = s.renter.CreateBucketIfNotExists(bucket) - if err != nil { - return nil, err - } - - err = s.renter.UploadObject(context.Background(), file, bucket, hashStr) - - if err != nil { - return nil, err - } - - err = s.renter.UploadObject(context.Background(), bytes.NewReader(hashResult.Proof), bucket, fmt.Sprintf("%s.bao", hashStr)) - - if err != nil { - return nil, err - } - - upload, err = s.CreateUpload(hashResult.Hash, mimeType, userId, userIp, uint64(len), bucket) - if err != nil { - return nil, err - } - - return upload, nil -} - -func (s *StorageServiceDefault) BuildUploadBufferTus(basePath string, preUploadCb TusPreUploadCreateCallback, preFinishCb TusPreFinishResponseCallback) (*tusd.Handler, tusd.DataStore, *s3.Client, error) { - customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) { - if service == s3.ServiceID { - return aws.Endpoint{ - URL: s.config.GetString("core.storage.s3.endpoint"), - SigningRegion: s.config.GetString("core.storage.s3.region"), - }, nil +func (s StorageServiceDefault) UploadObject(ctx context.Context, protocol StorageProtocol, data io.ReadSeeker, muParams *renter.MultiPartUploadParams, proof *bao.Result) (*metadata.UploadMetadata, error) { + readers := make([]io.ReadCloser, 0) + defer func() { + for _, reader := range readers { + err := reader.Close() + if err != nil { + s.logger.Error("error closing reader", zap.Error(err)) + } } - return aws.Endpoint{}, &aws.EndpointNotFoundError{} - }) + }() - cfg, err := config.LoadDefaultConfig(context.TODO(), - config.WithRegion("us-east-1"), - config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider( - s.config.GetString("core.storage.s3.accessKey"), - s.config.GetString("core.storage.s3.secretKey"), - "", - )), - config.WithEndpointResolverWithOptions(customResolver), - ) - if err != nil { - return nil, nil, nil, nil - } + getReader := func() (io.Reader, error) { + if muParams != nil { + muReader, err := muParams.ReaderFactory(0, uint(muParams.Size)) + if err != nil { + return nil, err + } - s3Client := s3.NewFromConfig(cfg) + found := false + for _, reader := range readers { + if reader == muReader { + found = true + break + } + } - store := s3store.New(s.config.GetString("core.storage.s3.bufferBucket"), s3Client) - - locker := NewMySQLLocker(s.db, s.logger) - - composer := tusd.NewStoreComposer() - store.UseIn(composer) - composer.UseLocker(locker) - - handler, err := tusd.NewHandler(tusd.Config{ - BasePath: basePath, - StoreComposer: composer, - DisableDownload: true, - NotifyCompleteUploads: true, - NotifyTerminatedUploads: true, - NotifyCreatedUploads: true, - RespectForwardedHeaders: true, - PreUploadCreateCallback: preUploadCb, - }) - - return handler, store, s3Client, err -} - -func (s *StorageServiceDefault) init() error { - preUpload := func(hook tusd.HookEvent) (tusd.HTTPResponse, tusd.FileInfoChanges, error) { - blankResp := tusd.HTTPResponse{} - blankChanges := tusd.FileInfoChanges{} - - hash, ok := hook.Upload.MetaData["hash"] - if !ok { - return blankResp, blankChanges, errors.New("missing hash") + if !found { + readers = append(readers, muReader) + } } - decodedHash, err := encoding.MultihashFromBase64Url(hash) - + _, err := data.Seek(0, io.SeekStart) if err != nil { - return blankResp, blankChanges, err + return nil, err } - exists, _ := s.FileExists(decodedHash.HashBytes()) - - if exists { - return blankResp, blankChanges, errors.New("file already exists") - } - - exists, _ = s.TusUploadExists(decodedHash.HashBytes()) - - if exists { - return blankResp, blankChanges, errors.New("file is already being uploaded") - } - - return blankResp, blankChanges, nil + return data, nil } - tus, store, s3client, err := s.BuildUploadBufferTus("/s5/upload/tus", preUpload, nil) - + reader, err := getReader() if err != nil { - return err + return nil, err } - s.tus = tus - s.tusStore = store - s.s3Client = s3client - - s.cron.RegisterService(s) - - return nil -} -func (s *StorageServiceDefault) LoadInitialTasks(cron cron.CronService) error { - return nil -} - -func (s *StorageServiceDefault) FileExists(hash []byte) (bool, *models.Upload) { - hashStr := hex.EncodeToString(hash) - - var upload models.Upload - result := s.db.Model(&models.Upload{}).Where(&models.Upload{Hash: hashStr}).First(&upload) - - return result.RowsAffected > 0, &upload -} - -func (s *StorageServiceDefault) GetHashSmall(file io.ReadSeeker) (*bao.Result, int, error) { - buf := bytes.NewBuffer(nil) - - _, err := io.Copy(buf, file) - if err != nil { - return nil, 0, err - } - - result, _, err := bao.Hash(buf) - if err != nil { - return nil, 0, err - } - - return result, buf.Len(), nil -} -func (s *StorageServiceDefault) GetHash(file io.Reader) (*bao.Result, int, error) { - hash, totalBytes, err := bao.Hash(file) - - if err != nil { - return nil, 0, err - } - - return hash, totalBytes, nil -} - -func (s *StorageServiceDefault) CreateUpload(hash []byte, mime string, uploaderID uint, uploaderIP string, size uint64, protocol string) (*models.Upload, error) { - hashStr := hex.EncodeToString(hash) - - upload := &models.Upload{ - Hash: hashStr, - MimeType: mime, - UserID: uploaderID, - UploaderIP: uploaderIP, - Protocol: protocol, - Size: size, - } - - result := s.db.Create(upload) - - if result.Error != nil { - return nil, result.Error - } - - return upload, nil -} -func (s *StorageServiceDefault) tusWorker() { - - for { - select { - case info := <-s.tus.CreatedUploads: - hash, ok := info.Upload.MetaData["hash"] - errorResponse := tusd.HTTPResponse{StatusCode: 400, Header: nil} - if !ok { - s.logger.Error("Missing hash in metadata") - continue - } - - uploaderID, ok := info.Context.Value(middleware.DEFAULT_AUTH_CONTEXT_KEY).(uint64) - if !ok { - errorResponse.Body = "Missing user id in context" - info.Upload.StopUpload(errorResponse) - s.logger.Error("Missing user id in context") - continue - } - - uploaderIP := info.HTTPRequest.RemoteAddr - - decodedHash, err := encoding.MultihashFromBase64Url(hash) - - if err != nil { - errorResponse.Body = "Could not decode hash" - info.Upload.StopUpload(errorResponse) - s.logger.Error("Could not decode hash", zap.Error(err)) - continue - } - - _, err = s.CreateTusUpload(decodedHash.HashBytes(), info.Upload.ID, uint(uploaderID), uploaderIP, info.Context.Value("protocol").(string)) - if err != nil { - errorResponse.Body = "Could not create tus upload" - info.Upload.StopUpload(errorResponse) - s.logger.Error("Could not create tus upload", zap.Error(err)) - continue - } - case info := <-s.tus.UploadProgress: - err := s.TusUploadProgress(info.Upload.ID) - if err != nil { - s.logger.Error("Could not update tus upload", zap.Error(err)) - continue - } - case info := <-s.tus.TerminatedUploads: - err := s.DeleteTusUpload(info.Upload.ID) - if err != nil { - s.logger.Error("Could not delete tus upload", zap.Error(err)) - continue - } - - case info := <-s.tus.CompleteUploads: - if !(!info.Upload.SizeIsDeferred && info.Upload.Offset == info.Upload.Size) { - continue - } - err := s.TusUploadCompleted(info.Upload.ID) - if err != nil { - s.logger.Error("Could not complete tus upload", zap.Error(err)) - continue - } - err = s.ScheduleTusUpload(info.Upload.ID) - if err != nil { - s.logger.Error("Could not schedule tus upload", zap.Error(err)) - continue - } - - } - } -} - -func (s *StorageServiceDefault) TusUploadExists(hash []byte) (bool, models.TusUpload) { - hashStr := hex.EncodeToString(hash) - - var upload models.TusUpload - result := s.db.Model(&models.TusUpload{}).Where(&models.TusUpload{Hash: hashStr}).First(&upload) - - return result.RowsAffected > 0, upload -} - -func (s *StorageServiceDefault) CreateTusUpload(hash []byte, uploadID string, uploaderID uint, uploaderIP string, protocol string) (*models.TusUpload, error) { - hashStr := hex.EncodeToString(hash) - - upload := &models.TusUpload{ - Hash: hashStr, - UploadID: uploadID, - UploaderID: uploaderID, - UploaderIP: uploaderIP, - Uploader: models.User{}, - Protocol: protocol, - } - - result := s.db.Create(upload) - - if result.Error != nil { - return nil, result.Error - } - - return upload, nil -} -func (s *StorageServiceDefault) TusUploadProgress(uploadID string) error { - - find := &models.TusUpload{UploadID: uploadID} - - var upload models.TusUpload - result := s.db.Model(&models.TusUpload{}).Where(find).First(&upload) - - if result.RowsAffected == 0 { - return errors.New("upload not found") - } - - result = s.db.Model(&models.TusUpload{}).Where(find).Update("updated_at", time.Now()) - - if result.Error != nil { - return result.Error - } - - return nil -} -func (s *StorageServiceDefault) TusUploadCompleted(uploadID string) error { - - find := &models.TusUpload{UploadID: uploadID} - - var upload models.TusUpload - result := s.db.Model(&models.TusUpload{}).Where(find).First(&upload) - - if result.RowsAffected == 0 { - return errors.New("upload not found") - } - - result = s.db.Model(&models.TusUpload{}).Where(find).Update("completed", true) - - return nil -} -func (s *StorageServiceDefault) DeleteTusUpload(uploadID string) error { - result := s.db.Where(&models.TusUpload{UploadID: uploadID}).Delete(&models.TusUpload{}) - - if result.Error != nil { - return result.Error - } - - return nil -} - -func (s *StorageServiceDefault) ScheduleTusUpload(uploadID string) error { - find := &models.TusUpload{UploadID: uploadID} - - var upload models.TusUpload - result := s.db.Model(&models.TusUpload{}).Where(find).First(&upload) - - if result.RowsAffected == 0 { - return errors.New("upload not found") - } - - task := s.cron.RetryableTask(cron.RetryableTaskParams{ - Name: "tusUpload", - Function: s.tusUploadTask, - Args: []interface{}{&upload}, - Attempt: 0, - Limit: 0, - After: func(jobID uuid.UUID, jobName string) { - s.logger.Info("Job finished", zap.String("jobName", jobName), zap.String("uploadID", uploadID)) - err := s.DeleteTusUpload(uploadID) - if err != nil { - s.logger.Error("Error deleting tus upload", zap.Error(err)) - } - }, - }) - - _, err := s.cron.CreateJob(task) - - if err != nil { - return err - } - return nil -} - -func (s *StorageServiceDefault) tusUploadTask(upload *models.TusUpload) error { - ctx := context.Background() - tusUpload, err := s.tusStore.GetUpload(ctx, upload.UploadID) - if err != nil { - s.logger.Error("Could not get upload", zap.Error(err)) - return err - } - - readerHash, err := tusUpload.GetReader(ctx) - if err != nil { - s.logger.Error("Could not get tus file", zap.Error(err)) - return err - } - - defer func(reader io.ReadCloser) { - err := reader.Close() + if proof == nil { + hashResult, err := s.HashObject(ctx, reader) if err != nil { - s.logger.Error("Could not close reader", zap.Error(err)) + return nil, err } - }(readerHash) - hash, byteCount, err := s.GetHash(readerHash) - - if err != nil { - s.logger.Error("Could not compute hash", zap.Error(err)) - return err - } - - dbHash, err := hex.DecodeString(upload.Hash) - - if err != nil { - s.logger.Error("Could not decode hash", zap.Error(err)) - return err - } - - if !bytes.Equal(hash.Hash, dbHash) { - s.logger.Error("Hashes do not match", zap.Any("upload", upload), zap.Any("hash", hash), zap.Any("dbHash", dbHash)) - return err - } - - readerMime, err := tusUpload.GetReader(ctx) - if err != nil { - s.logger.Error("Could not get tus file", zap.Error(err)) - return err - } - - defer func(reader io.ReadCloser) { - err := reader.Close() + reader, err = getReader() if err != nil { - s.logger.Error("Could not close reader", zap.Error(err)) + return nil, err } - }(readerMime) - var mimeBuf [512]byte + proof = hashResult + } - _, err = readerMime.Read(mimeBuf[:]) + mimeBytes := make([]byte, 512) + _, err = io.ReadFull(data, mimeBytes) + if err != nil { + return nil, err + } + + reader, err = getReader() + if err != nil { + return nil, err + } + + mimeType := http.DetectContentType(mimeBytes) + + protocolName := protocol.Name() + + err = s.renter.CreateBucketIfNotExists(protocolName) + if err != nil { + return nil, err + } + + filename := protocol.EncodeFileName(proof.Hash) + + err = s.UploadObjectProof(ctx, protocol, nil, proof) if err != nil { - s.logger.Error("Could not read mime", zap.Error(err)) - return err + return nil, err } - mimeType := http.DetectContentType(mimeBuf[:]) + if muParams != nil { + muParams.FileName = filename + muParams.Bucket = protocolName - upload.MimeType = mimeType - - if tx := s.db.Save(upload); tx.Error != nil { - s.logger.Error("Could not update tus upload", zap.Error(tx.Error)) - return tx.Error - } - - hashStr, err := encoding.NewMultihash(s.getPrefixedHash(hash.Hash)).ToBase64Url() - err = s.renter.CreateBucketIfNotExists(upload.Protocol) - if err != nil { - return err - } - - info, err := tusUpload.GetInfo(context.Background()) - if err != nil { - s.logger.Error("Could not get tus info", zap.Error(err)) - return err - } - - err = s.renter.MultipartUpload(renter.MultiPartUploadParams{ - ReaderFactory: func(start uint, end uint) (io.ReadCloser, error) { - rangeHeader := fmt.Sprintf("bytes=%d-%d", start, end) - ctx = context.WithValue(ctx, "range", rangeHeader) - return tusUpload.GetReader(ctx) - }, - Bucket: upload.Protocol, - FileName: "/" + hashStr, - Size: uint64(info.Size), - }) - - if err != nil { - s.logger.Error("Could not upload file", zap.Error(err)) - return err - } - - err = s.renter.UploadObject(context.Background(), bytes.NewReader(hash.Proof), upload.Protocol, fmt.Sprintf("%s.bao", hashStr)) - - if err != nil { - return err - } - - s3InfoId, _ := splitS3Ids(upload.UploadID) - - _, err = s.s3Client.DeleteObjects(ctx, &s3.DeleteObjectsInput{ - Bucket: aws.String(s.config.GetString("core.storage.s3.bufferBucket")), - Delete: &s3types.Delete{ - Objects: []s3types.ObjectIdentifier{ - { - Key: aws.String(s3InfoId), - }, - { - Key: aws.String(s3InfoId + ".info"), - }, - }, - Quiet: aws.Bool(true), - }, - }) - - if err != nil { - s.logger.Error("Could not delete upload metadata", zap.Error(err)) - return err - } - - newUpload, err := s.CreateUpload(dbHash, mimeType, upload.UploaderID, upload.UploaderIP, uint64(byteCount), upload.Protocol) - if err != nil { - s.logger.Error("Could not create upload", zap.Error(err)) - return err - } - - err = s.accounts.PinByID(newUpload.ID, upload.UploaderID) - if err != nil { - s.logger.Error("Could not pin upload", zap.Error(err)) - return err - } - - return nil -} - -func (s *StorageServiceDefault) getPrefixedHash(hash []byte) []byte { - return append([]byte{byte(types.HashTypeBlake3)}, hash...) -} - -func splitS3Ids(id string) (objectId, multipartId string) { - index := strings.Index(id, "+") - if index == -1 { - return - } - - objectId = id[:index] - multipartId = id[index+1:] - return -} - -func (s *StorageServiceDefault) GetFile(hash []byte, start int64) (io.ReadCloser, int64, error) { - if exists, tusUpload := s.TusUploadExists(hash); exists { - if tusUpload.Completed { - upload, err := s.tusStore.GetUpload(context.Background(), tusUpload.UploadID) - if err != nil { - return nil, 0, err - } - - info, _ := upload.GetInfo(context.Background()) - - ctx := context.Background() - - if start > 0 { - endPosition := start + info.Size - 1 - rangeHeader := fmt.Sprintf("bytes=%d-%d", start, endPosition) - ctx = context.WithValue(ctx, "range", rangeHeader) - } - - reader, err := upload.GetReader(ctx) - - return reader, info.Size, err + err = s.renter.UploadObjectMultipart(ctx, muParams) + if err != nil { + return nil, err } + + return &metadata.UploadMetadata{ + Protocol: protocolName, + Hash: proof.Hash, + MimeType: mimeType, + Size: uint64(proof.Length), + }, nil } - exists, upload := s.FileExists(hash) - - if !exists { - return nil, 0, errors.New("file does not exist") - } - - hashStr, err := encoding.NewMultihash(s.getPrefixedHash(hash)).ToBase64Url() + err = s.renter.UploadObject(ctx, reader, protocolName, filename) if err != nil { - return nil, 0, err + return nil, err } + return nil, nil +} + +func (s StorageServiceDefault) UploadObjectProof(ctx context.Context, protocol StorageProtocol, data io.ReadSeeker, proof *bao.Result) error { + if proof == nil { + hashResult, err := s.HashObject(ctx, data) + if err != nil { + return err + } + + proof = hashResult + } + + protocolName := protocol.Name() + + err := s.renter.CreateBucketIfNotExists(protocolName) + + if err != nil { + return err + } + + return s.renter.UploadObject(ctx, bytes.NewReader(proof.Proof), protocolName, s.getProofPath(protocol, proof.Hash)) +} + +func (s StorageServiceDefault) HashObject(ctx context.Context, data io.Reader) (*bao.Result, error) { + result, err := bao.Hash(data) + + if err != nil { + return nil, err + } + + return result, nil +} + +func (s StorageServiceDefault) DownloadObject(ctx context.Context, protocol StorageProtocol, objectHash []byte, start int64) (io.ReadCloser, error) { var partialRange api.DownloadRange + upload, err := s.metadata.GetUpload(ctx, objectHash) + if err != nil { + return nil, err + } + if start > 0 { partialRange = api.DownloadRange{ Offset: start, @@ -690,20 +233,41 @@ func (s *StorageServiceDefault) GetFile(hash []byte, start int64) (io.ReadCloser } } - object, err := s.renter.GetObject(context.Background(), upload.Protocol, hashStr, api.DownloadObjectOptions{ - Range: partialRange, - }) - + object, err := s.renter.GetObject(ctx, protocol.Name(), protocol.EncodeFileName(objectHash), api.DownloadObjectOptions{Range: partialRange}) if err != nil { - return nil, 0, err + return nil, err } - return object.Content, int64(upload.Size), nil + return object.Content, nil } -func (s *StorageServiceDefault) NewFile(hash []byte) *FileImpl { - return NewFile(FileParams{ - Storage: s, - Renter: s.renter, - Hash: hash, - }) + +func (s StorageServiceDefault) DownloadObjectProof(ctx context.Context, protocol StorageProtocol, objectHash []byte) (io.ReadCloser, error) { + object, err := s.renter.GetObject(ctx, protocol.Name(), protocol.EncodeFileName(objectHash)+".bao", api.DownloadObjectOptions{}) + if err != nil { + return nil, err + } + + return object.Content, nil +} + +func (s StorageServiceDefault) DeleteObject(ctx context.Context, protocol StorageProtocol, objectHash []byte) error { + err := s.renter.DeleteObject(ctx, protocol.Name(), protocol.EncodeFileName(objectHash)) + if err != nil { + return err + } + + return nil +} + +func (s StorageServiceDefault) DeleteObjectProof(ctx context.Context, protocol StorageProtocol, objectHash []byte) error { + err := s.renter.DeleteObject(ctx, protocol.Name(), s.getProofPath(protocol, objectHash)) + if err != nil { + return err + } + + return nil +} + +func (s StorageServiceDefault) getProofPath(protocol StorageProtocol, objectHash []byte) string { + return fmt.Sprintf("%s/%s.%s", protocol.Name(), protocol.EncodeFileName(objectHash), PROOF_EXTENSION) }