refactor: epic protocol and storage design refactor
This commit is contained in:
parent
c534162d6c
commit
93e727ab3b
|
@ -0,0 +1,234 @@
|
||||||
|
package s5
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/hex"
|
||||||
|
"errors"
|
||||||
|
"io"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/protocols/s5"
|
||||||
|
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/metadata"
|
||||||
|
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/storage"
|
||||||
|
|
||||||
|
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
|
||||||
|
"git.lumeweb.com/LumeWeb/libs5-go/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ io.ReadSeekCloser = (*S5File)(nil)
|
||||||
|
|
||||||
|
type S5File struct {
|
||||||
|
reader io.ReadCloser
|
||||||
|
hash []byte
|
||||||
|
storage storage.StorageService
|
||||||
|
metadata metadata.MetadataService
|
||||||
|
record *metadata.UploadMetadata
|
||||||
|
protocol *s5.S5Protocol
|
||||||
|
cid *encoding.CID
|
||||||
|
read bool
|
||||||
|
tus *s5.TusHandler
|
||||||
|
}
|
||||||
|
|
||||||
|
type FileParams struct {
|
||||||
|
Storage storage.StorageService
|
||||||
|
Metadata metadata.MetadataService
|
||||||
|
Hash []byte
|
||||||
|
Protocol *s5.S5Protocol
|
||||||
|
Tus *s5.TusHandler
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewFile(params FileParams) *S5File {
|
||||||
|
return &S5File{
|
||||||
|
storage: params.Storage,
|
||||||
|
metadata: params.Metadata,
|
||||||
|
hash: params.Hash,
|
||||||
|
protocol: params.Protocol,
|
||||||
|
tus: params.Tus,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *S5File) Exists() bool {
|
||||||
|
_, err := f.metadata.GetUpload(context.Background(), f.hash)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *S5File) Read(p []byte) (n int, err error) {
|
||||||
|
err = f.init(0)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
f.read = true
|
||||||
|
|
||||||
|
return f.reader.Read(p)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *S5File) Seek(offset int64, whence int) (int64, error) {
|
||||||
|
switch whence {
|
||||||
|
case io.SeekStart:
|
||||||
|
if !f.read {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if f.reader != nil {
|
||||||
|
err := f.reader.Close()
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
f.reader = nil
|
||||||
|
}
|
||||||
|
err := f.init(offset)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
case io.SeekCurrent:
|
||||||
|
return 0, errors.New("not supported")
|
||||||
|
case io.SeekEnd:
|
||||||
|
return int64(f.Size()), nil
|
||||||
|
default:
|
||||||
|
return 0, errors.New("invalid whence")
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *S5File) Close() error {
|
||||||
|
if f.reader != nil {
|
||||||
|
r := f.reader
|
||||||
|
f.reader = nil
|
||||||
|
return r.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *S5File) init(offset int64) error {
|
||||||
|
if f.reader == nil {
|
||||||
|
|
||||||
|
reader, err := f.tus.GetTusUploadReader(f.hash, offset)
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
f.reader = reader
|
||||||
|
f.read = false
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
reader, err = f.storage.DownloadObject(context.Background(), f.StorageProtocol(), f.hash, offset)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
f.reader = reader
|
||||||
|
f.read = false
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *S5File) Record() (*metadata.UploadMetadata, error) {
|
||||||
|
if f.record == nil {
|
||||||
|
record, err := f.metadata.GetUpload(context.Background(), f.Hash())
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.New("file does not exist")
|
||||||
|
}
|
||||||
|
|
||||||
|
f.record = &record
|
||||||
|
}
|
||||||
|
|
||||||
|
return f.record, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *S5File) Hash() []byte {
|
||||||
|
hashStr := f.HashString()
|
||||||
|
|
||||||
|
if hashStr == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
str, err := hex.DecodeString(hashStr)
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return str
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *S5File) HashString() string {
|
||||||
|
record, err := f.Record()
|
||||||
|
if err != nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
return hex.EncodeToString(record.Hash)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *S5File) Name() string {
|
||||||
|
cid, _ := f.CID().ToString()
|
||||||
|
|
||||||
|
return cid
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *S5File) Modtime() time.Time {
|
||||||
|
record, err := f.Record()
|
||||||
|
if err != nil {
|
||||||
|
return time.Unix(0, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
return record.Created
|
||||||
|
}
|
||||||
|
func (f *S5File) Size() uint64 {
|
||||||
|
record, err := f.Record()
|
||||||
|
if err != nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
return record.Size
|
||||||
|
}
|
||||||
|
func (f *S5File) CID() *encoding.CID {
|
||||||
|
if f.cid == nil {
|
||||||
|
multihash := encoding.MultihashFromBytes(f.Hash(), types.HashTypeBlake3)
|
||||||
|
cid := encoding.NewCID(types.CIDTypeRaw, *multihash, f.Size())
|
||||||
|
f.cid = cid
|
||||||
|
}
|
||||||
|
return f.cid
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *S5File) Mime() string {
|
||||||
|
record, err := f.Record()
|
||||||
|
if err != nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
return record.MimeType
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *S5File) StorageProtocol() storage.StorageProtocol {
|
||||||
|
return s5.GetStorageProtocol(f.protocol)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *S5File) Proof() ([]byte, error) {
|
||||||
|
object, err := f.storage.DownloadObjectProof(context.Background(), f.StorageProtocol(), f.hash)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
proof, err := io.ReadAll(object)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = object.Close()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return proof, nil
|
||||||
|
}
|
170
api/s5/s5.go
170
api/s5/s5.go
|
@ -21,8 +21,14 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/metadata"
|
||||||
|
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/storage"
|
||||||
|
|
||||||
|
"github.com/getkin/kin-openapi/openapi3"
|
||||||
|
|
||||||
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
|
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
|
||||||
"git.lumeweb.com/LumeWeb/libs5-go/metadata"
|
s5libmetadata "git.lumeweb.com/LumeWeb/libs5-go/metadata"
|
||||||
"git.lumeweb.com/LumeWeb/libs5-go/node"
|
"git.lumeweb.com/LumeWeb/libs5-go/node"
|
||||||
"git.lumeweb.com/LumeWeb/libs5-go/protocol"
|
"git.lumeweb.com/LumeWeb/libs5-go/protocol"
|
||||||
"git.lumeweb.com/LumeWeb/libs5-go/service"
|
"git.lumeweb.com/LumeWeb/libs5-go/service"
|
||||||
|
@ -43,8 +49,6 @@ import (
|
||||||
"git.lumeweb.com/LumeWeb/portal/api/registry"
|
"git.lumeweb.com/LumeWeb/portal/api/registry"
|
||||||
protoRegistry "git.lumeweb.com/LumeWeb/portal/protocols/registry"
|
protoRegistry "git.lumeweb.com/LumeWeb/portal/protocols/registry"
|
||||||
"git.lumeweb.com/LumeWeb/portal/protocols/s5"
|
"git.lumeweb.com/LumeWeb/portal/protocols/s5"
|
||||||
"git.lumeweb.com/LumeWeb/portal/storage"
|
|
||||||
"github.com/getkin/kin-openapi/openapi3"
|
|
||||||
"github.com/rs/cors"
|
"github.com/rs/cors"
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
"go.sia.tech/jape"
|
"go.sia.tech/jape"
|
||||||
|
@ -67,12 +71,13 @@ type S5API struct {
|
||||||
config *viper.Viper
|
config *viper.Viper
|
||||||
identity ed25519.PrivateKey
|
identity ed25519.PrivateKey
|
||||||
accounts *account.AccountServiceDefault
|
accounts *account.AccountServiceDefault
|
||||||
storage *storage.StorageServiceDefault
|
storage storage.StorageService
|
||||||
|
metadata metadata.MetadataService
|
||||||
db *gorm.DB
|
db *gorm.DB
|
||||||
protocols []protoRegistry.Protocol
|
protocols []protoRegistry.Protocol
|
||||||
httpHandler HttpHandler
|
|
||||||
protocol *s5.S5Protocol
|
protocol *s5.S5Protocol
|
||||||
logger *zap.Logger
|
logger *zap.Logger
|
||||||
|
tusHandler *s5.TusHandler
|
||||||
}
|
}
|
||||||
|
|
||||||
type APIParams struct {
|
type APIParams struct {
|
||||||
|
@ -80,10 +85,11 @@ type APIParams struct {
|
||||||
Config *viper.Viper
|
Config *viper.Viper
|
||||||
Identity ed25519.PrivateKey
|
Identity ed25519.PrivateKey
|
||||||
Accounts *account.AccountServiceDefault
|
Accounts *account.AccountServiceDefault
|
||||||
Storage *storage.StorageServiceDefault
|
Storage storage.StorageService
|
||||||
Db *gorm.DB
|
Db *gorm.DB
|
||||||
Protocols []protoRegistry.Protocol `group:"protocol"`
|
Protocols []protoRegistry.Protocol `group:"protocol"`
|
||||||
Logger *zap.Logger
|
Logger *zap.Logger
|
||||||
|
TusHandler *s5.TusHandler
|
||||||
}
|
}
|
||||||
|
|
||||||
type S5ApiResult struct {
|
type S5ApiResult struct {
|
||||||
|
@ -101,6 +107,7 @@ func NewS5(params APIParams) (S5ApiResult, error) {
|
||||||
db: params.Db,
|
db: params.Db,
|
||||||
protocols: params.Protocols,
|
protocols: params.Protocols,
|
||||||
logger: params.Logger,
|
logger: params.Logger,
|
||||||
|
tusHandler: params.TusHandler,
|
||||||
}
|
}
|
||||||
return S5ApiResult{
|
return S5ApiResult{
|
||||||
API: api,
|
API: api,
|
||||||
|
@ -146,7 +153,7 @@ func (s *S5API) Routes() *httprouter.Router {
|
||||||
|
|
||||||
authMw := authMiddleware(authMiddlewareOpts)
|
authMw := authMiddleware(authMiddlewareOpts)
|
||||||
|
|
||||||
tusHandler := BuildS5TusApi(authMw, s.storage)
|
tusHandler := BuildS5TusApi(authMw, s.tusHandler)
|
||||||
|
|
||||||
tusOptionsHandler := func(c jape.Context) {
|
tusOptionsHandler := func(c jape.Context) {
|
||||||
c.ResponseWriter.WriteHeader(http.StatusOK)
|
c.ResponseWriter.WriteHeader(http.StatusOK)
|
||||||
|
@ -291,10 +298,10 @@ func BuildTusCors() func(h http.Handler) http.Handler {
|
||||||
return mw.Handler
|
return mw.Handler
|
||||||
}
|
}
|
||||||
|
|
||||||
func BuildS5TusApi(authMw middleware.HttpMiddlewareFunc, storage *storage.StorageServiceDefault) jape.Handler {
|
func BuildS5TusApi(authMw middleware.HttpMiddlewareFunc, handler *s5.TusHandler) jape.Handler {
|
||||||
// Create a jape.Handler for your tusHandler
|
// Create a jape.Handler for your tusHandler
|
||||||
tusJapeHandler := func(c jape.Context) {
|
tusJapeHandler := func(c jape.Context) {
|
||||||
tusHandler := storage.Tus()
|
tusHandler := handler.Tus()
|
||||||
tusHandler.ServeHTTP(c.ResponseWriter, c.Request)
|
tusHandler.ServeHTTP(c.ResponseWriter, c.Request)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -334,45 +341,6 @@ func (rsnc readSeekNopCloser) Close() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type HttpHandler struct {
|
|
||||||
config *viper.Viper
|
|
||||||
logger *zap.Logger
|
|
||||||
storage *storage.StorageServiceDefault
|
|
||||||
db *gorm.DB
|
|
||||||
accounts *account.AccountServiceDefault
|
|
||||||
protocol *s5.S5Protocol
|
|
||||||
}
|
|
||||||
|
|
||||||
type HttpHandlerParams struct {
|
|
||||||
fx.In
|
|
||||||
|
|
||||||
Config *viper.Viper
|
|
||||||
Logger *zap.Logger
|
|
||||||
Storage *storage.StorageServiceDefault
|
|
||||||
Db *gorm.DB
|
|
||||||
Accounts *account.AccountServiceDefault
|
|
||||||
Protocol *s5.S5Protocol
|
|
||||||
}
|
|
||||||
|
|
||||||
type HttpHandlerResult struct {
|
|
||||||
fx.Out
|
|
||||||
|
|
||||||
HttpHandler HttpHandler
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewHttpHandler(params HttpHandlerParams) (HttpHandlerResult, error) {
|
|
||||||
return HttpHandlerResult{
|
|
||||||
HttpHandler: HttpHandler{
|
|
||||||
config: params.Config,
|
|
||||||
logger: params.Logger,
|
|
||||||
storage: params.Storage,
|
|
||||||
db: params.Db,
|
|
||||||
accounts: params.Accounts,
|
|
||||||
protocol: params.Protocol,
|
|
||||||
},
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *S5API) smallFileUpload(jc jape.Context) {
|
func (s *S5API) smallFileUpload(jc jape.Context) {
|
||||||
user := middleware.GetUserFromContext(jc.Request.Context())
|
user := middleware.GetUserFromContext(jc.Request.Context())
|
||||||
|
|
||||||
|
@ -388,8 +356,18 @@ func (s *S5API) smallFileUpload(jc jape.Context) {
|
||||||
}
|
}
|
||||||
}(file)
|
}(file)
|
||||||
|
|
||||||
// Use PutFileSmall for the actual file upload
|
newUpload, err2 := s.storage.UploadObject(jc.Request.Context(), s5.GetStorageProtocol(s.protocol), file, nil, nil)
|
||||||
newUpload, err2 := s.storage.PutFileSmall(file, "s5", user, jc.Request.RemoteAddr)
|
|
||||||
|
if err2 != nil {
|
||||||
|
s.sendErrorResponse(jc, NewS5Error(ErrKeyFileUploadFailed, err2))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
newUpload.UserID = user
|
||||||
|
newUpload.UploaderIP = jc.Request.RemoteAddr
|
||||||
|
|
||||||
|
err2 = s.metadata.SaveUpload(jc.Request.Context(), *newUpload)
|
||||||
|
|
||||||
if err2 != nil {
|
if err2 != nil {
|
||||||
s.sendErrorResponse(jc, NewS5Error(ErrKeyFileUploadFailed, err2))
|
s.sendErrorResponse(jc, NewS5Error(ErrKeyFileUploadFailed, err2))
|
||||||
return
|
return
|
||||||
|
@ -721,7 +699,6 @@ func (s *S5API) accountStats(jc jape.Context) {
|
||||||
func (s *S5API) accountPins(jc jape.Context) {
|
func (s *S5API) accountPins(jc jape.Context) {
|
||||||
var cursor uint64
|
var cursor uint64
|
||||||
if err := jc.DecodeForm("cursor", &cursor); err != nil {
|
if err := jc.DecodeForm("cursor", &cursor); err != nil {
|
||||||
// Assuming jc.DecodeForm sends out its own error, so no need for further action here
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -784,10 +761,7 @@ func (s *S5API) accountPin(jc jape.Context) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
hash := hex.EncodeToString(decodedCid.Hash.HashBytes())
|
if err := s.accounts.PinByHash(decodedCid.Hash.HashBytes(), userID); err != nil {
|
||||||
s.logger.Info("Processing pin request", zap.String("cid", cid), zap.String("hash", hash))
|
|
||||||
|
|
||||||
if err := s.accounts.PinByHash(hash, userID); err != nil {
|
|
||||||
s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, err))
|
s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -818,32 +792,32 @@ func (s *S5API) directoryUpload(jc jape.Context) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
user := middleware.GetUserFromContext(jc.Request.Context())
|
uploads, err := s.processMultipartFiles(jc.Request)
|
||||||
uploads, err := s.processMultipartFiles(jc.Request, user)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.sendErrorResponse(jc, err) // processMultipartFiles should return a properly wrapped S5Error
|
s.sendErrorResponse(jc, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Generate metadata for the directory upload
|
// Generate metadata for the directory upload
|
||||||
app, err := s.createAppMetadata(name, tryFiles, errorPages, uploads)
|
app, err := s.createAppMetadata(name, tryFiles, errorPages, uploads)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.sendErrorResponse(jc, err) // createAppMetadata should return a properly wrapped S5Error
|
s.sendErrorResponse(jc, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Upload the metadata
|
// Upload the metadata
|
||||||
cidStr, err := s.uploadAppMetadata(app, user, jc.Request)
|
cidStr, err := s.uploadAppMetadata(app, jc.Request)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.sendErrorResponse(jc, err) // uploadAppMetadata should return a properly wrapped S5Error
|
s.sendErrorResponse(jc, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
jc.Encode(&AppUploadResponse{CID: cidStr})
|
jc.Encode(&AppUploadResponse{CID: cidStr})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *S5API) processMultipartFiles(r *http.Request, user uint) (map[string]*models.Upload, error) {
|
func (s *S5API) processMultipartFiles(r *http.Request) (map[string]*metadata.UploadMetadata, error) {
|
||||||
uploadMap := make(map[string]*models.Upload)
|
uploadMap := make(map[string]*metadata.UploadMetadata)
|
||||||
|
user := middleware.GetUserFromContext(r.Context())
|
||||||
|
|
||||||
for _, files := range r.MultipartForm.File {
|
for _, files := range r.MultipartForm.File {
|
||||||
for _, fileHeader := range files {
|
for _, fileHeader := range files {
|
||||||
|
@ -858,7 +832,15 @@ func (s *S5API) processMultipartFiles(r *http.Request, user uint) (map[string]*m
|
||||||
}
|
}
|
||||||
}(file)
|
}(file)
|
||||||
|
|
||||||
upload, err := s.storage.PutFileSmall(file, "s5", user, r.RemoteAddr)
|
upload, err := s.storage.UploadObject(r.Context(), s5.GetStorageProtocol(s.protocol), file, nil, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, NewS5Error(ErrKeyStorageOperationFailed, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
upload.UserID = user
|
||||||
|
upload.UploaderIP = r.RemoteAddr
|
||||||
|
|
||||||
|
err = s.metadata.SaveUpload(r.Context(), *upload)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, NewS5Error(ErrKeyStorageOperationFailed, err)
|
return nil, NewS5Error(ErrKeyStorageOperationFailed, err)
|
||||||
}
|
}
|
||||||
|
@ -870,20 +852,17 @@ func (s *S5API) processMultipartFiles(r *http.Request, user uint) (map[string]*m
|
||||||
return uploadMap, nil
|
return uploadMap, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *S5API) createAppMetadata(name string, tryFiles []string, errorPages map[int]string, uploads map[string]*models.Upload) (*metadata.WebAppMetadata, error) {
|
func (s *S5API) createAppMetadata(name string, tryFiles []string, errorPages map[int]string, uploads map[string]*metadata.UploadMetadata) (*s5libmetadata.WebAppMetadata, error) {
|
||||||
filesMap := make(map[string]metadata.WebAppMetadataFileReference, len(uploads))
|
filesMap := make(map[string]s5libmetadata.WebAppMetadataFileReference, len(uploads))
|
||||||
|
|
||||||
for filename, upload := range uploads {
|
for filename, upload := range uploads {
|
||||||
hashDecoded, err := hex.DecodeString(upload.Hash)
|
hash := upload.Hash
|
||||||
if err != nil {
|
|
||||||
return nil, NewS5Error(ErrKeyInternalError, err, "Failed to decode hash for file: "+filename)
|
|
||||||
}
|
|
||||||
|
|
||||||
cid, err := encoding.CIDFromHash(hashDecoded, upload.Size, types.CIDTypeRaw, types.HashTypeBlake3)
|
cid, err := encoding.CIDFromHash(hash, upload.Size, types.CIDTypeRaw, types.HashTypeBlake3)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, NewS5Error(ErrKeyInternalError, err, "Failed to create CID for file: "+filename)
|
return nil, NewS5Error(ErrKeyInternalError, err, "Failed to create CID for file: "+filename)
|
||||||
}
|
}
|
||||||
filesMap[filename] = metadata.WebAppMetadataFileReference{
|
filesMap[filename] = s5libmetadata.WebAppMetadataFileReference{
|
||||||
Cid: cid,
|
Cid: cid,
|
||||||
ContentType: upload.MimeType,
|
ContentType: upload.MimeType,
|
||||||
}
|
}
|
||||||
|
@ -894,9 +873,9 @@ func (s *S5API) createAppMetadata(name string, tryFiles []string, errorPages map
|
||||||
extraMetadataMap[statusCode] = page
|
extraMetadataMap[statusCode] = page
|
||||||
}
|
}
|
||||||
|
|
||||||
extraMetadata := metadata.NewExtraMetadata(extraMetadataMap)
|
extraMetadata := s5libmetadata.NewExtraMetadata(extraMetadataMap)
|
||||||
// Create the web app metadata object
|
// Create the web app metadata object
|
||||||
app := metadata.NewWebAppMetadata(
|
app := s5libmetadata.NewWebAppMetadata(
|
||||||
name,
|
name,
|
||||||
tryFiles,
|
tryFiles,
|
||||||
*extraMetadata,
|
*extraMetadata,
|
||||||
|
@ -907,27 +886,37 @@ func (s *S5API) createAppMetadata(name string, tryFiles []string, errorPages map
|
||||||
return app, nil
|
return app, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *S5API) uploadAppMetadata(appData *metadata.WebAppMetadata, userId uint, r *http.Request) (string, error) {
|
func (s *S5API) uploadAppMetadata(appData *s5libmetadata.WebAppMetadata, r *http.Request) (string, *S5Error) {
|
||||||
|
userId := middleware.GetUserFromContext(r.Context())
|
||||||
|
|
||||||
appDataRaw, err := msgpack.Marshal(appData)
|
appDataRaw, err := msgpack.Marshal(appData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", NewS5Error(ErrKeyInternalError, err, "Failed to marshal app metadata")
|
return "", NewS5Error(ErrKeyInternalError, err, "Failed to marshal app s5libmetadata")
|
||||||
}
|
}
|
||||||
|
|
||||||
file := bytes.NewReader(appDataRaw)
|
file := bytes.NewReader(appDataRaw)
|
||||||
|
|
||||||
upload, err := s.storage.PutFileSmall(file, "s5", userId, r.RemoteAddr)
|
upload, err := s.storage.UploadObject(r.Context(), s5.GetStorageProtocol(s.protocol), file, nil, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", NewS5Error(ErrKeyStorageOperationFailed, err)
|
return "", NewS5Error(ErrKeyStorageOperationFailed, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Construct the CID for the newly uploaded metadata
|
upload.UserID = userId
|
||||||
|
upload.UploaderIP = r.RemoteAddr
|
||||||
|
|
||||||
|
err = s.metadata.SaveUpload(r.Context(), *upload)
|
||||||
|
if err != nil {
|
||||||
|
return "", NewS5Error(ErrKeyStorageOperationFailed, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Construct the CID for the newly uploaded s5libmetadata
|
||||||
cid, err := encoding.CIDFromHash(upload.Hash, uint64(len(appDataRaw)), types.CIDTypeMetadataWebapp, types.HashTypeBlake3)
|
cid, err := encoding.CIDFromHash(upload.Hash, uint64(len(appDataRaw)), types.CIDTypeMetadataWebapp, types.HashTypeBlake3)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", NewS5Error(ErrKeyInternalError, err, "Failed to create CID for new app metadata")
|
return "", NewS5Error(ErrKeyInternalError, err, "Failed to create CID for new app s5libmetadata")
|
||||||
}
|
}
|
||||||
cidStr, err := cid.ToString()
|
cidStr, err := cid.ToString()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", NewS5Error(ErrKeyInternalError, err, "Failed to convert CID to string for new app metadata")
|
return "", NewS5Error(ErrKeyInternalError, err, "Failed to convert CID to string for new app s5libmetadata")
|
||||||
}
|
}
|
||||||
|
|
||||||
return cidStr, nil
|
return cidStr, nil
|
||||||
|
@ -1294,7 +1283,7 @@ func (s *S5API) downloadMetadata(jc jape.Context) {
|
||||||
|
|
||||||
switch cidDecoded.Type {
|
switch cidDecoded.Type {
|
||||||
case types.CIDTypeRaw:
|
case types.CIDTypeRaw:
|
||||||
_ = jc.Error(errors.New("Raw CIDs do not have metadata"), http.StatusBadRequest)
|
_ = jc.Error(errors.New("Raw CIDs do not have s5libmetadata"), http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
|
|
||||||
case types.CIDTypeResolver:
|
case types.CIDTypeResolver:
|
||||||
|
@ -1304,8 +1293,8 @@ func (s *S5API) downloadMetadata(jc jape.Context) {
|
||||||
|
|
||||||
meta, err := s.getNode().Services().Storage().GetMetadataByCID(cidDecoded)
|
meta, err := s.getNode().Services().Storage().GetMetadataByCID(cidDecoded)
|
||||||
|
|
||||||
if jc.Check("error getting metadata", err) != nil {
|
if jc.Check("error getting s5libmetadata", err) != nil {
|
||||||
s.logger.Error("error getting metadata", zap.Error(err))
|
s.logger.Error("error getting s5libmetadata", zap.Error(err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1329,9 +1318,9 @@ func (s *S5API) downloadFile(jc jape.Context) {
|
||||||
var hashBytes []byte
|
var hashBytes []byte
|
||||||
isProof := false
|
isProof := false
|
||||||
|
|
||||||
if strings.HasSuffix(cid, ".obao") {
|
if strings.HasSuffix(cid, storage.PROOF_EXTENSION) {
|
||||||
isProof = true
|
isProof = true
|
||||||
cid = strings.TrimSuffix(cid, ".obao")
|
cid = strings.TrimSuffix(cid, storage.PROOF_EXTENSION)
|
||||||
}
|
}
|
||||||
|
|
||||||
cidDecoded, err := encoding.CIDFromString(cid)
|
cidDecoded, err := encoding.CIDFromString(cid)
|
||||||
|
@ -1348,7 +1337,7 @@ func (s *S5API) downloadFile(jc jape.Context) {
|
||||||
hashBytes = cidDecoded.Hash.HashBytes()
|
hashBytes = cidDecoded.Hash.HashBytes()
|
||||||
}
|
}
|
||||||
|
|
||||||
file := s.storage.NewFile(hashBytes)
|
file := s.newFile(s.protocol, hashBytes)
|
||||||
|
|
||||||
if !file.Exists() {
|
if !file.Exists() {
|
||||||
jc.ResponseWriter.WriteHeader(http.StatusNotFound)
|
jc.ResponseWriter.WriteHeader(http.StatusNotFound)
|
||||||
|
@ -1413,6 +1402,15 @@ func (s *S5API) newStorageLocationProvider(hash *encoding.Multihash, types ...ty
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *S5API) newFile(protocol *s5.S5Protocol, hash []byte) *S5File {
|
||||||
|
return NewFile(FileParams{
|
||||||
|
Protocol: protocol,
|
||||||
|
Hash: hash,
|
||||||
|
Metadata: s.metadata,
|
||||||
|
Storage: s.storage,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func setAuthCookie(jwt string, jc jape.Context) {
|
func setAuthCookie(jwt string, jc jape.Context) {
|
||||||
authCookie := http.Cookie{
|
authCookie := http.Cookie{
|
||||||
Name: "s5-auth-token",
|
Name: "s5-auth-token",
|
||||||
|
|
|
@ -1,14 +1,15 @@
|
||||||
package storage
|
package s5
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"os"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.lumeweb.com/LumeWeb/portal/db/models"
|
"git.lumeweb.com/LumeWeb/portal/db/models"
|
||||||
tusd "github.com/tus/tusd/v2/pkg/handler"
|
tusd "github.com/tus/tusd/v2/pkg/handler"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"gorm.io/gorm"
|
"gorm.io/gorm"
|
||||||
"os"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -17,7 +18,6 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
type MySQLLocker struct {
|
type MySQLLocker struct {
|
||||||
storage *StorageServiceDefault
|
|
||||||
AcquirerPollInterval time.Duration
|
AcquirerPollInterval time.Duration
|
||||||
HolderPollInterval time.Duration
|
HolderPollInterval time.Duration
|
||||||
db *gorm.DB
|
db *gorm.DB
|
|
@ -3,7 +3,14 @@ package s5
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/ed25519"
|
"crypto/ed25519"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/metadata"
|
||||||
|
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/storage"
|
||||||
|
|
||||||
s5config "git.lumeweb.com/LumeWeb/libs5-go/config"
|
s5config "git.lumeweb.com/LumeWeb/libs5-go/config"
|
||||||
s5ed "git.lumeweb.com/LumeWeb/libs5-go/ed25519"
|
s5ed "git.lumeweb.com/LumeWeb/libs5-go/ed25519"
|
||||||
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
|
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
|
||||||
|
@ -12,12 +19,10 @@ import (
|
||||||
s5storage "git.lumeweb.com/LumeWeb/libs5-go/storage"
|
s5storage "git.lumeweb.com/LumeWeb/libs5-go/storage"
|
||||||
"git.lumeweb.com/LumeWeb/libs5-go/types"
|
"git.lumeweb.com/LumeWeb/libs5-go/types"
|
||||||
"git.lumeweb.com/LumeWeb/portal/protocols/registry"
|
"git.lumeweb.com/LumeWeb/portal/protocols/registry"
|
||||||
"git.lumeweb.com/LumeWeb/portal/storage"
|
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
bolt "go.etcd.io/bbolt"
|
bolt "go.etcd.io/bbolt"
|
||||||
"go.uber.org/fx"
|
"go.uber.org/fx"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -28,18 +33,20 @@ var (
|
||||||
type S5Protocol struct {
|
type S5Protocol struct {
|
||||||
config *viper.Viper
|
config *viper.Viper
|
||||||
logger *zap.Logger
|
logger *zap.Logger
|
||||||
storage *storage.StorageServiceDefault
|
storage storage.StorageService
|
||||||
identity ed25519.PrivateKey
|
identity ed25519.PrivateKey
|
||||||
node *s5node.Node
|
node *s5node.Node
|
||||||
|
tusHandler *TusHandler
|
||||||
}
|
}
|
||||||
|
|
||||||
type S5ProtocolParams struct {
|
type S5ProtocolParams struct {
|
||||||
fx.In
|
fx.In
|
||||||
Config *viper.Viper
|
Config *viper.Viper
|
||||||
Logger *zap.Logger
|
Logger *zap.Logger
|
||||||
Storage *storage.StorageServiceDefault
|
Storage storage.StorageService
|
||||||
Identity ed25519.PrivateKey
|
Identity ed25519.PrivateKey
|
||||||
ProviderStore *S5ProviderStore
|
ProviderStore *S5ProviderStore
|
||||||
|
TusHandler *TusHandler
|
||||||
}
|
}
|
||||||
|
|
||||||
type S5ProtocolResult struct {
|
type S5ProtocolResult struct {
|
||||||
|
@ -50,8 +57,17 @@ type S5ProtocolResult struct {
|
||||||
Db *bolt.DB
|
Db *bolt.DB
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type S5ProviderStoreParams struct {
|
||||||
|
fx.In
|
||||||
|
Config *viper.Viper
|
||||||
|
Metadata metadata.MetadataService
|
||||||
|
Logger *zap.Logger
|
||||||
|
Tus *TusHandler
|
||||||
|
}
|
||||||
|
|
||||||
var ProtocolModule = fx.Module("s5_api",
|
var ProtocolModule = fx.Module("s5_api",
|
||||||
fx.Provide(NewS5Protocol),
|
fx.Provide(NewS5Protocol),
|
||||||
|
fx.Provide(NewTusHandler),
|
||||||
fx.Provide(NewS5ProviderStore),
|
fx.Provide(NewS5ProviderStore),
|
||||||
fx.Replace(func(cfg *s5config.NodeConfig) *zap.Logger {
|
fx.Replace(func(cfg *s5config.NodeConfig) *zap.Logger {
|
||||||
return cfg.Logger
|
return cfg.Logger
|
||||||
|
@ -67,6 +83,7 @@ func NewS5Protocol(
|
||||||
logger: params.Logger,
|
logger: params.Logger,
|
||||||
storage: params.Storage,
|
storage: params.Storage,
|
||||||
identity: params.Identity,
|
identity: params.Identity,
|
||||||
|
tusHandler: params.TusHandler,
|
||||||
}
|
}
|
||||||
|
|
||||||
cfg, err := ConfigureS5Protocol(params)
|
cfg, err := ConfigureS5Protocol(params)
|
||||||
|
@ -136,11 +153,12 @@ func ConfigureS5Protocol(params S5ProtocolParams) (*s5config.NodeConfig, error)
|
||||||
return cfg, nil
|
return cfg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewS5ProviderStore(config *viper.Viper, logger *zap.Logger, storage *storage.StorageServiceDefault) *S5ProviderStore {
|
func NewS5ProviderStore(params S5ProviderStoreParams) *S5ProviderStore {
|
||||||
return &S5ProviderStore{
|
return &S5ProviderStore{
|
||||||
config: config,
|
config: params.Config,
|
||||||
logger: logger,
|
logger: params.Logger,
|
||||||
storage: storage,
|
tus: params.Tus,
|
||||||
|
metadata: params.Metadata,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -165,6 +183,11 @@ func (s *S5Protocol) Init(args ...any) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = s.tusHandler.Init()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
func (s *S5Protocol) Start(ctx context.Context) error {
|
func (s *S5Protocol) Start(ctx context.Context) error {
|
||||||
|
@ -199,7 +222,8 @@ func (s *S5Protocol) Stop(ctx context.Context) error {
|
||||||
type S5ProviderStore struct {
|
type S5ProviderStore struct {
|
||||||
config *viper.Viper
|
config *viper.Viper
|
||||||
logger *zap.Logger
|
logger *zap.Logger
|
||||||
storage *storage.StorageServiceDefault
|
tus *TusHandler
|
||||||
|
metadata metadata.MetadataService
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s S5ProviderStore) CanProvide(hash *encoding.Multihash, kind []types.StorageLocationType) bool {
|
func (s S5ProviderStore) CanProvide(hash *encoding.Multihash, kind []types.StorageLocationType) bool {
|
||||||
|
@ -208,13 +232,13 @@ func (s S5ProviderStore) CanProvide(hash *encoding.Multihash, kind []types.Stora
|
||||||
case types.StorageLocationTypeArchive, types.StorageLocationTypeFile, types.StorageLocationTypeFull:
|
case types.StorageLocationTypeArchive, types.StorageLocationTypeFile, types.StorageLocationTypeFull:
|
||||||
rawHash := hash.HashBytes()
|
rawHash := hash.HashBytes()
|
||||||
|
|
||||||
if exists, upload := s.storage.TusUploadExists(rawHash); exists {
|
if exists, upload := s.tus.TusUploadExists(rawHash); exists {
|
||||||
if upload.Completed {
|
if upload.Completed {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
if exists, _ := s.storage.FileExists(rawHash); exists {
|
if _, err := s.metadata.GetUpload(context.Background(), rawHash); errors.Is(err, metadata.ErrNotFound) {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,444 @@
|
||||||
|
package s5
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"encoding/hex"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/account"
|
||||||
|
|
||||||
|
"github.com/spf13/viper"
|
||||||
|
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/metadata"
|
||||||
|
|
||||||
|
"github.com/aws/aws-sdk-go-v2/config"
|
||||||
|
"github.com/aws/aws-sdk-go-v2/credentials"
|
||||||
|
"github.com/tus/tusd/v2/pkg/s3store"
|
||||||
|
|
||||||
|
tusd "github.com/tus/tusd/v2/pkg/handler"
|
||||||
|
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/storage"
|
||||||
|
"gorm.io/gorm"
|
||||||
|
|
||||||
|
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/cron"
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/db/models"
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/renter"
|
||||||
|
"github.com/aws/aws-sdk-go-v2/aws"
|
||||||
|
"github.com/aws/aws-sdk-go-v2/service/s3"
|
||||||
|
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
_ cron.CronableService = (*TusHandler)(nil)
|
||||||
|
)
|
||||||
|
|
||||||
|
type TusHandler struct {
|
||||||
|
config *viper.Viper
|
||||||
|
db *gorm.DB
|
||||||
|
logger *zap.Logger
|
||||||
|
cron *cron.CronServiceDefault
|
||||||
|
storage storage.StorageService
|
||||||
|
accounts *account.AccountServiceDefault
|
||||||
|
metadata metadata.MetadataService
|
||||||
|
tus *tusd.Handler
|
||||||
|
tusStore tusd.DataStore
|
||||||
|
s3Client *s3.Client
|
||||||
|
protocol *S5Protocol
|
||||||
|
}
|
||||||
|
|
||||||
|
type TusHandlerParams struct {
|
||||||
|
Config *viper.Viper
|
||||||
|
Logger *zap.Logger
|
||||||
|
Db *gorm.DB
|
||||||
|
Cron *cron.CronServiceDefault
|
||||||
|
Storage storage.StorageService
|
||||||
|
Accounts *account.AccountServiceDefault
|
||||||
|
Metadata metadata.MetadataService
|
||||||
|
Protocol *S5Protocol
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewTusHandler(params TusHandlerParams) *TusHandler {
|
||||||
|
return &TusHandler{
|
||||||
|
config: params.Config,
|
||||||
|
db: params.Db,
|
||||||
|
logger: params.Logger,
|
||||||
|
cron: params.Cron,
|
||||||
|
storage: params.Storage,
|
||||||
|
accounts: params.Accounts,
|
||||||
|
metadata: params.Metadata,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TusHandler) Init() error {
|
||||||
|
preUpload := func(hook tusd.HookEvent) (tusd.HTTPResponse, tusd.FileInfoChanges, error) {
|
||||||
|
blankResp := tusd.HTTPResponse{}
|
||||||
|
blankChanges := tusd.FileInfoChanges{}
|
||||||
|
|
||||||
|
hash, ok := hook.Upload.MetaData["hash"]
|
||||||
|
if !ok {
|
||||||
|
return blankResp, blankChanges, errors.New("missing hash")
|
||||||
|
}
|
||||||
|
|
||||||
|
decodedHash, err := encoding.MultihashFromBase64Url(hash)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return blankResp, blankChanges, err
|
||||||
|
}
|
||||||
|
|
||||||
|
upload, err := t.metadata.GetUpload(hook.Context, decodedHash.HashBytes())
|
||||||
|
|
||||||
|
if !upload.IsEmpty() {
|
||||||
|
if err != nil && !errors.Is(err, metadata.ErrNotFound) {
|
||||||
|
return blankResp, blankChanges, err
|
||||||
|
}
|
||||||
|
return blankResp, blankChanges, errors.New("file already exists")
|
||||||
|
}
|
||||||
|
|
||||||
|
exists, _ := t.TusUploadExists(decodedHash.HashBytes())
|
||||||
|
|
||||||
|
if exists {
|
||||||
|
return blankResp, blankChanges, errors.New("file is already being uploaded")
|
||||||
|
}
|
||||||
|
|
||||||
|
return blankResp, blankChanges, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
|
||||||
|
if service == s3.ServiceID {
|
||||||
|
return aws.Endpoint{
|
||||||
|
URL: t.config.GetString("core.storage.s3.endpoint"),
|
||||||
|
SigningRegion: t.config.GetString("core.storage.s3.region"),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
return aws.Endpoint{}, &aws.EndpointNotFoundError{}
|
||||||
|
})
|
||||||
|
|
||||||
|
cfg, err := config.LoadDefaultConfig(context.TODO(),
|
||||||
|
config.WithRegion("us-east-1"),
|
||||||
|
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
|
||||||
|
t.config.GetString("core.storage.s3.accessKey"),
|
||||||
|
t.config.GetString("core.storage.s3.secretKey"),
|
||||||
|
"",
|
||||||
|
)),
|
||||||
|
config.WithEndpointResolverWithOptions(customResolver),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
s3Client := s3.NewFromConfig(cfg)
|
||||||
|
|
||||||
|
store := s3store.New(t.config.GetString("core.storage.s3.bufferBucket"), s3Client)
|
||||||
|
|
||||||
|
locker := NewMySQLLocker(t.db, t.logger)
|
||||||
|
|
||||||
|
composer := tusd.NewStoreComposer()
|
||||||
|
store.UseIn(composer)
|
||||||
|
composer.UseLocker(locker)
|
||||||
|
|
||||||
|
handler, err := tusd.NewHandler(tusd.Config{
|
||||||
|
BasePath: "/s5/upload/tus",
|
||||||
|
StoreComposer: composer,
|
||||||
|
DisableDownload: true,
|
||||||
|
NotifyCompleteUploads: true,
|
||||||
|
NotifyTerminatedUploads: true,
|
||||||
|
NotifyCreatedUploads: true,
|
||||||
|
RespectForwardedHeaders: true,
|
||||||
|
PreUploadCreateCallback: preUpload,
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
t.tus = handler
|
||||||
|
t.tusStore = store
|
||||||
|
t.s3Client = s3Client
|
||||||
|
|
||||||
|
t.cron.RegisterService(t)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TusHandler) LoadInitialTasks(cron cron.CronService) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TusHandler) Tus() *tusd.Handler {
|
||||||
|
return t.tus
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TusHandler) TusUploadExists(hash []byte) (bool, models.TusUpload) {
|
||||||
|
hashStr := hex.EncodeToString(hash)
|
||||||
|
|
||||||
|
var upload models.TusUpload
|
||||||
|
result := t.db.Model(&models.TusUpload{}).Where(&models.TusUpload{Hash: hashStr}).First(&upload)
|
||||||
|
|
||||||
|
return result.RowsAffected > 0, upload
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TusHandler) CreateTusUpload(hash []byte, uploadID string, uploaderID uint, uploaderIP string, protocol string) (*models.TusUpload, error) {
|
||||||
|
hashStr := hex.EncodeToString(hash)
|
||||||
|
|
||||||
|
upload := &models.TusUpload{
|
||||||
|
Hash: hashStr,
|
||||||
|
UploadID: uploadID,
|
||||||
|
UploaderID: uploaderID,
|
||||||
|
UploaderIP: uploaderIP,
|
||||||
|
Uploader: models.User{},
|
||||||
|
Protocol: protocol,
|
||||||
|
}
|
||||||
|
|
||||||
|
result := t.db.Create(upload)
|
||||||
|
|
||||||
|
if result.Error != nil {
|
||||||
|
return nil, result.Error
|
||||||
|
}
|
||||||
|
|
||||||
|
return upload, nil
|
||||||
|
}
|
||||||
|
func (t *TusHandler) TusUploadProgress(uploadID string) error {
|
||||||
|
|
||||||
|
find := &models.TusUpload{UploadID: uploadID}
|
||||||
|
|
||||||
|
var upload models.TusUpload
|
||||||
|
result := t.db.Model(&models.TusUpload{}).Where(find).First(&upload)
|
||||||
|
|
||||||
|
if result.RowsAffected == 0 {
|
||||||
|
return errors.New("upload not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
result = t.db.Model(&models.TusUpload{}).Where(find).Update("updated_at", time.Now())
|
||||||
|
|
||||||
|
if result.Error != nil {
|
||||||
|
return result.Error
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (t *TusHandler) TusUploadCompleted(uploadID string) error {
|
||||||
|
|
||||||
|
find := &models.TusUpload{UploadID: uploadID}
|
||||||
|
|
||||||
|
var upload models.TusUpload
|
||||||
|
result := t.db.Model(&models.TusUpload{}).Where(find).First(&upload)
|
||||||
|
|
||||||
|
if result.RowsAffected == 0 {
|
||||||
|
return errors.New("upload not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
result = t.db.Model(&models.TusUpload{}).Where(find).Update("completed", true)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (t *TusHandler) DeleteTusUpload(uploadID string) error {
|
||||||
|
result := t.db.Where(&models.TusUpload{UploadID: uploadID}).Delete(&models.TusUpload{})
|
||||||
|
|
||||||
|
if result.Error != nil {
|
||||||
|
return result.Error
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TusHandler) ScheduleTusUpload(uploadID string) error {
|
||||||
|
find := &models.TusUpload{UploadID: uploadID}
|
||||||
|
|
||||||
|
var upload models.TusUpload
|
||||||
|
result := t.db.Model(&models.TusUpload{}).Where(find).First(&upload)
|
||||||
|
|
||||||
|
if result.RowsAffected == 0 {
|
||||||
|
return errors.New("upload not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
task := t.cron.RetryableTask(cron.RetryableTaskParams{
|
||||||
|
Name: "tusUpload",
|
||||||
|
Function: t.tusUploadTask,
|
||||||
|
Args: []interface{}{&upload},
|
||||||
|
Attempt: 0,
|
||||||
|
Limit: 0,
|
||||||
|
After: func(jobID uuid.UUID, jobName string) {
|
||||||
|
t.logger.Info("Job finished", zap.String("jobName", jobName), zap.String("uploadID", uploadID))
|
||||||
|
err := t.DeleteTusUpload(uploadID)
|
||||||
|
if err != nil {
|
||||||
|
t.logger.Error("Error deleting tus upload", zap.Error(err))
|
||||||
|
}
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
_, err := t.cron.CreateJob(task)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TusHandler) GetTusUploadReader(hash []byte, start int64) (io.ReadCloser, error) {
|
||||||
|
ctx := context.Background()
|
||||||
|
exists, upload := t.TusUploadExists(hash)
|
||||||
|
|
||||||
|
if !exists {
|
||||||
|
return nil, metadata.ErrNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
meta, err := t.tusStore.GetUpload(ctx, upload.UploadID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
info, err := meta.GetInfo(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if start > 0 {
|
||||||
|
endPosition := start + info.Size - 1
|
||||||
|
rangeHeader := fmt.Sprintf("bytes=%d-%d", start, endPosition)
|
||||||
|
ctx = context.WithValue(ctx, "range", rangeHeader)
|
||||||
|
}
|
||||||
|
|
||||||
|
reader, err := meta.GetReader(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return reader, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TusHandler) tusUploadTask(upload *models.TusUpload) error {
|
||||||
|
ctx := context.Background()
|
||||||
|
tusUpload, err := t.tusStore.GetUpload(ctx, upload.UploadID)
|
||||||
|
if err != nil {
|
||||||
|
t.logger.Error("Could not get upload", zap.Error(err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
readers := make([]io.ReadCloser, 0)
|
||||||
|
getReader := func() (io.Reader, error) {
|
||||||
|
muReader, err := tusUpload.GetReader(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
readers = append(readers, muReader)
|
||||||
|
return muReader, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
for _, reader := range readers {
|
||||||
|
err := reader.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.logger.Error("error closing reader", zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
reader, err := getReader()
|
||||||
|
if err != nil {
|
||||||
|
t.logger.Error("Could not get tus file", zap.Error(err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
proof, err := t.storage.HashObject(ctx, reader)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.logger.Error("Could not compute proof", zap.Error(err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
dbHash, err := hex.DecodeString(upload.Hash)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.logger.Error("Could not decode proof", zap.Error(err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !bytes.Equal(proof.Hash, dbHash) {
|
||||||
|
t.logger.Error("Hashes do not match", zap.Any("upload", upload), zap.Any("proof", proof), zap.Any("dbHash", dbHash))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
info, err := tusUpload.GetInfo(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
t.logger.Error("Could not get tus info", zap.Error(err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
storageProtocol := GetStorageProtocol(t.protocol)
|
||||||
|
|
||||||
|
uploadMeta, err := t.storage.UploadObject(ctx, storageProtocol, nil, &renter.MultiPartUploadParams{
|
||||||
|
ReaderFactory: func(start uint, end uint) (io.ReadCloser, error) {
|
||||||
|
rangeHeader := fmt.Sprintf("bytes=%d-%d", start, end)
|
||||||
|
ctx = context.WithValue(ctx, "range", rangeHeader)
|
||||||
|
return tusUpload.GetReader(ctx)
|
||||||
|
},
|
||||||
|
Bucket: upload.Protocol,
|
||||||
|
FileName: "/" + storageProtocol.EncodeFileName(dbHash),
|
||||||
|
Size: uint64(info.Size),
|
||||||
|
}, proof)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.logger.Error("Could not upload file", zap.Error(err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
s3InfoId, _ := splitS3Ids(upload.UploadID)
|
||||||
|
|
||||||
|
_, err = t.s3Client.DeleteObjects(ctx, &s3.DeleteObjectsInput{
|
||||||
|
Bucket: aws.String(t.config.GetString("core.storage.s3.bufferBucket")),
|
||||||
|
Delete: &s3types.Delete{
|
||||||
|
Objects: []s3types.ObjectIdentifier{
|
||||||
|
{
|
||||||
|
Key: aws.String(s3InfoId),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Key: aws.String(s3InfoId + ".info"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Quiet: aws.Bool(true),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.logger.Error("Could not delete upload metadata", zap.Error(err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
uploadMeta.UserID = upload.UploaderID
|
||||||
|
uploadMeta.UploaderIP = upload.UploaderIP
|
||||||
|
|
||||||
|
err = t.metadata.SaveUpload(ctx, *uploadMeta)
|
||||||
|
if err != nil {
|
||||||
|
t.logger.Error("Could not create upload", zap.Error(err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = t.accounts.PinByHash(dbHash, upload.UploaderID)
|
||||||
|
if err != nil {
|
||||||
|
t.logger.Error("Could not pin upload", zap.Error(err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func splitS3Ids(id string) (objectId, multipartId string) {
|
||||||
|
index := strings.Index(id, "+")
|
||||||
|
if index == -1 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
objectId = id[:index]
|
||||||
|
multipartId = id[index+1:]
|
||||||
|
return
|
||||||
|
}
|
||||||
|
func GetStorageProtocol(proto *S5Protocol) storage.StorageProtocol {
|
||||||
|
return interface{}(proto).(storage.StorageProtocol)
|
||||||
|
}
|
210
storage/file.go
210
storage/file.go
|
@ -1,210 +0,0 @@
|
||||||
package storage
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/hex"
|
|
||||||
"errors"
|
|
||||||
"io"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
|
|
||||||
"git.lumeweb.com/LumeWeb/libs5-go/types"
|
|
||||||
"git.lumeweb.com/LumeWeb/portal/db/models"
|
|
||||||
"git.lumeweb.com/LumeWeb/portal/renter"
|
|
||||||
"go.sia.tech/renterd/api"
|
|
||||||
)
|
|
||||||
|
|
||||||
type FileImpl struct {
|
|
||||||
reader io.ReadCloser
|
|
||||||
hash []byte
|
|
||||||
storage *StorageServiceDefault
|
|
||||||
renter *renter.RenterDefault
|
|
||||||
record *models.Upload
|
|
||||||
cid *encoding.CID
|
|
||||||
read bool
|
|
||||||
}
|
|
||||||
|
|
||||||
type FileParams struct {
|
|
||||||
Storage *StorageServiceDefault
|
|
||||||
Renter *renter.RenterDefault
|
|
||||||
Hash []byte
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewFile(params FileParams) *FileImpl {
|
|
||||||
return &FileImpl{hash: params.Hash, storage: params.Storage, renter: params.Renter, read: false}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *FileImpl) Exists() bool {
|
|
||||||
exists, _ := f.storage.FileExists(f.hash)
|
|
||||||
|
|
||||||
return exists
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *FileImpl) Read(p []byte) (n int, err error) {
|
|
||||||
err = f.init(0)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
f.read = true
|
|
||||||
|
|
||||||
return f.reader.Read(p)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *FileImpl) Seek(offset int64, whence int) (int64, error) {
|
|
||||||
switch whence {
|
|
||||||
case io.SeekStart:
|
|
||||||
if !f.read {
|
|
||||||
return 0, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if f.reader != nil {
|
|
||||||
err := f.reader.Close()
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
f.reader = nil
|
|
||||||
}
|
|
||||||
err := f.init(offset)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
case io.SeekCurrent:
|
|
||||||
return 0, errors.New("not supported")
|
|
||||||
case io.SeekEnd:
|
|
||||||
return int64(f.Size()), nil
|
|
||||||
default:
|
|
||||||
return 0, errors.New("invalid whence")
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *FileImpl) Close() error {
|
|
||||||
if f.reader != nil {
|
|
||||||
r := f.reader
|
|
||||||
f.reader = nil
|
|
||||||
return r.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *FileImpl) init(offset int64) error {
|
|
||||||
if f.reader == nil {
|
|
||||||
reader, _, err := f.storage.GetFile(f.hash, offset)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
f.reader = reader
|
|
||||||
f.read = false
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *FileImpl) Record() (*models.Upload, error) {
|
|
||||||
if f.record == nil {
|
|
||||||
exists, record := f.storage.FileExists(f.hash)
|
|
||||||
|
|
||||||
if !exists {
|
|
||||||
return nil, errors.New("file does not exist")
|
|
||||||
}
|
|
||||||
|
|
||||||
f.record = record
|
|
||||||
}
|
|
||||||
|
|
||||||
return f.record, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *FileImpl) Hash() []byte {
|
|
||||||
hashStr := f.HashString()
|
|
||||||
|
|
||||||
if hashStr == "" {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
str, err := hex.DecodeString(hashStr)
|
|
||||||
if err != nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return str
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *FileImpl) HashString() string {
|
|
||||||
record, err := f.Record()
|
|
||||||
if err != nil {
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
|
|
||||||
return record.Hash
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *FileImpl) Name() string {
|
|
||||||
cid, _ := f.CID().ToString()
|
|
||||||
|
|
||||||
return cid
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *FileImpl) Modtime() time.Time {
|
|
||||||
record, err := f.Record()
|
|
||||||
if err != nil {
|
|
||||||
return time.Unix(0, 0)
|
|
||||||
}
|
|
||||||
|
|
||||||
return record.CreatedAt
|
|
||||||
}
|
|
||||||
func (f *FileImpl) Size() uint64 {
|
|
||||||
record, err := f.Record()
|
|
||||||
if err != nil {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|
||||||
return record.Size
|
|
||||||
}
|
|
||||||
func (f *FileImpl) CID() *encoding.CID {
|
|
||||||
if f.cid == nil {
|
|
||||||
multihash := encoding.MultihashFromBytes(f.Hash(), types.HashTypeBlake3)
|
|
||||||
cid := encoding.NewCID(types.CIDTypeRaw, *multihash, f.Size())
|
|
||||||
f.cid = cid
|
|
||||||
}
|
|
||||||
return f.cid
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *FileImpl) Mime() string {
|
|
||||||
record, err := f.Record()
|
|
||||||
if err != nil {
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
|
|
||||||
return record.MimeType
|
|
||||||
}
|
|
||||||
|
|
||||||
func (f *FileImpl) Protocol() string {
|
|
||||||
record, err := f.Record()
|
|
||||||
if err != nil {
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
|
|
||||||
return record.Protocol
|
|
||||||
}
|
|
||||||
func (f *FileImpl) Proof() ([]byte, error) {
|
|
||||||
object, err := f.renter.GetObject(context.Background(), f.Protocol(), f.HashString(), api.DownloadObjectOptions{})
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
proof, err := io.ReadAll(object.Content)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = object.Content.Close()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return proof, nil
|
|
||||||
}
|
|
|
@ -3,685 +3,228 @@ package storage
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/hex"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.lumeweb.com/LumeWeb/portal/api/middleware"
|
|
||||||
|
|
||||||
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
|
|
||||||
"git.lumeweb.com/LumeWeb/libs5-go/types"
|
|
||||||
"git.lumeweb.com/LumeWeb/portal/account"
|
|
||||||
"git.lumeweb.com/LumeWeb/portal/bao"
|
|
||||||
"git.lumeweb.com/LumeWeb/portal/cron"
|
|
||||||
"git.lumeweb.com/LumeWeb/portal/db/models"
|
|
||||||
"git.lumeweb.com/LumeWeb/portal/renter"
|
|
||||||
"github.com/aws/aws-sdk-go-v2/aws"
|
|
||||||
"github.com/aws/aws-sdk-go-v2/config"
|
|
||||||
"github.com/aws/aws-sdk-go-v2/credentials"
|
|
||||||
"github.com/aws/aws-sdk-go-v2/service/s3"
|
|
||||||
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
|
|
||||||
"github.com/google/uuid"
|
|
||||||
"github.com/spf13/viper"
|
|
||||||
tusd "github.com/tus/tusd/v2/pkg/handler"
|
|
||||||
"github.com/tus/tusd/v2/pkg/s3store"
|
|
||||||
"go.sia.tech/renterd/api"
|
|
||||||
"go.uber.org/fx"
|
"go.uber.org/fx"
|
||||||
|
|
||||||
|
"go.sia.tech/renterd/api"
|
||||||
|
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/metadata"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/bao"
|
||||||
|
|
||||||
|
"github.com/spf13/viper"
|
||||||
"gorm.io/gorm"
|
"gorm.io/gorm"
|
||||||
|
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/renter"
|
||||||
)
|
)
|
||||||
|
|
||||||
type TusPreUploadCreateCallback func(hook tusd.HookEvent) (tusd.HTTPResponse, tusd.FileInfoChanges, error)
|
const PROOF_EXTENSION = ".obao"
|
||||||
type TusPreFinishResponseCallback func(hook tusd.HookEvent) (tusd.HTTPResponse, error)
|
|
||||||
|
|
||||||
type StorageServiceParams struct {
|
var _ StorageService = (*StorageServiceDefault)(nil)
|
||||||
fx.In
|
|
||||||
Config *viper.Viper
|
type FileNameEncoderFunc func([]byte) string
|
||||||
Logger *zap.Logger
|
|
||||||
Db *gorm.DB
|
type StorageProtocol interface {
|
||||||
Accounts *account.AccountServiceDefault
|
Name() string
|
||||||
Cron *cron.CronServiceDefault
|
EncodeFileName([]byte) string
|
||||||
Renter *renter.RenterDefault
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var Module = fx.Module("storage",
|
var Module = fx.Module("storage",
|
||||||
fx.Provide(
|
fx.Provide(
|
||||||
NewStorageService,
|
NewStorageService,
|
||||||
|
fx.As(new(StorageService)),
|
||||||
),
|
),
|
||||||
fx.Invoke(func(s *StorageServiceDefault) error {
|
|
||||||
return s.init()
|
|
||||||
}),
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type StorageService interface {
|
||||||
|
UploadObject(ctx context.Context, protocol StorageProtocol, data io.ReadSeeker, muParams *renter.MultiPartUploadParams, proof *bao.Result) (*metadata.UploadMetadata, error)
|
||||||
|
UploadObjectProof(ctx context.Context, protocol StorageProtocol, data io.ReadSeeker, proof *bao.Result) error
|
||||||
|
HashObject(ctx context.Context, data io.Reader) (*bao.Result, error)
|
||||||
|
DownloadObject(ctx context.Context, protocol StorageProtocol, objectHash []byte, start int64) (io.ReadCloser, error)
|
||||||
|
DownloadObjectProof(ctx context.Context, protocol StorageProtocol, objectHash []byte) (io.ReadCloser, error)
|
||||||
|
DeleteObject(ctx context.Context, protocol StorageProtocol, objectHash []byte) error
|
||||||
|
DeleteObjectProof(ctx context.Context, protocol StorageProtocol, objectHash []byte) error
|
||||||
|
}
|
||||||
|
|
||||||
type StorageServiceDefault struct {
|
type StorageServiceDefault struct {
|
||||||
tus *tusd.Handler
|
|
||||||
tusStore tusd.DataStore
|
|
||||||
s3Client *s3.Client
|
|
||||||
config *viper.Viper
|
config *viper.Viper
|
||||||
logger *zap.Logger
|
|
||||||
db *gorm.DB
|
db *gorm.DB
|
||||||
accounts *account.AccountServiceDefault
|
|
||||||
cron *cron.CronServiceDefault
|
|
||||||
renter *renter.RenterDefault
|
renter *renter.RenterDefault
|
||||||
|
logger *zap.Logger
|
||||||
|
metadata metadata.MetadataService
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *StorageServiceDefault) Tus() *tusd.Handler {
|
type StorageServiceParams struct {
|
||||||
return s.tus
|
Config *viper.Viper
|
||||||
|
Db *gorm.DB
|
||||||
|
Renter *renter.RenterDefault
|
||||||
|
Logger *zap.Logger
|
||||||
|
metadata metadata.MetadataService
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *StorageServiceDefault) Start() error {
|
func NewStorageService(params StorageServiceParams) *StorageServiceDefault {
|
||||||
return nil
|
return &StorageServiceDefault{
|
||||||
}
|
|
||||||
|
|
||||||
func NewStorageService(lc fx.Lifecycle, params StorageServiceParams) *StorageServiceDefault {
|
|
||||||
ss := &StorageServiceDefault{
|
|
||||||
config: params.Config,
|
config: params.Config,
|
||||||
logger: params.Logger,
|
|
||||||
db: params.Db,
|
db: params.Db,
|
||||||
accounts: params.Accounts,
|
|
||||||
cron: params.Cron,
|
|
||||||
renter: params.Renter,
|
renter: params.Renter,
|
||||||
}
|
}
|
||||||
|
|
||||||
lc.Append(fx.Hook{
|
|
||||||
OnStart: func(ctx context.Context) error {
|
|
||||||
go ss.tusWorker()
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
return ss
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s StorageServiceDefault) PutFileSmall(file io.ReadSeeker, bucket string, userId uint, userIp string) (*models.Upload, error) {
|
func (s StorageServiceDefault) UploadObject(ctx context.Context, protocol StorageProtocol, data io.ReadSeeker, muParams *renter.MultiPartUploadParams, proof *bao.Result) (*metadata.UploadMetadata, error) {
|
||||||
hashResult, len, err := s.GetHashSmall(file)
|
readers := make([]io.ReadCloser, 0)
|
||||||
|
defer func() {
|
||||||
|
for _, reader := range readers {
|
||||||
|
err := reader.Close()
|
||||||
|
if err != nil {
|
||||||
|
s.logger.Error("error closing reader", zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
getReader := func() (io.Reader, error) {
|
||||||
|
if muParams != nil {
|
||||||
|
muReader, err := muParams.ReaderFactory(0, uint(muParams.Size))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
exists, upload := s.FileExists(hashResult.Hash)
|
found := false
|
||||||
if exists {
|
for _, reader := range readers {
|
||||||
return upload, nil
|
if reader == muReader {
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Re-seek the file to the beginning after hashing
|
if !found {
|
||||||
_, err = file.Seek(0, io.SeekStart)
|
readers = append(readers, muReader)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := data.Seek(0, io.SeekStart)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
hashStr, err := encoding.NewMultihash(s.getPrefixedHash(hashResult.Hash)).ToBase64Url()
|
return data, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
reader, err := getReader()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = file.Seek(0, io.SeekStart)
|
if proof == nil {
|
||||||
|
hashResult, err := s.HashObject(ctx, reader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
raw, err := io.ReadAll(file)
|
reader, err = getReader()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = file.Seek(0, io.SeekStart)
|
proof = hashResult
|
||||||
|
}
|
||||||
|
|
||||||
|
mimeBytes := make([]byte, 512)
|
||||||
|
_, err = io.ReadFull(data, mimeBytes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
mimeType := http.DetectContentType(raw)
|
reader, err = getReader()
|
||||||
|
|
||||||
err = s.renter.CreateBucketIfNotExists(bucket)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = s.renter.UploadObject(context.Background(), file, bucket, hashStr)
|
mimeType := http.DetectContentType(mimeBytes)
|
||||||
|
|
||||||
|
protocolName := protocol.Name()
|
||||||
|
|
||||||
|
err = s.renter.CreateBucketIfNotExists(protocolName)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
filename := protocol.EncodeFileName(proof.Hash)
|
||||||
|
|
||||||
|
err = s.UploadObjectProof(ctx, protocol, nil, proof)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = s.renter.UploadObject(context.Background(), bytes.NewReader(hashResult.Proof), bucket, fmt.Sprintf("%s.bao", hashStr))
|
if muParams != nil {
|
||||||
|
muParams.FileName = filename
|
||||||
|
muParams.Bucket = protocolName
|
||||||
|
|
||||||
|
err = s.renter.UploadObjectMultipart(ctx, muParams)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
upload, err = s.CreateUpload(hashResult.Hash, mimeType, userId, userIp, uint64(len), bucket)
|
return &metadata.UploadMetadata{
|
||||||
if err != nil {
|
Protocol: protocolName,
|
||||||
return nil, err
|
Hash: proof.Hash,
|
||||||
}
|
MimeType: mimeType,
|
||||||
|
Size: uint64(proof.Length),
|
||||||
return upload, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *StorageServiceDefault) BuildUploadBufferTus(basePath string, preUploadCb TusPreUploadCreateCallback, preFinishCb TusPreFinishResponseCallback) (*tusd.Handler, tusd.DataStore, *s3.Client, error) {
|
|
||||||
customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
|
|
||||||
if service == s3.ServiceID {
|
|
||||||
return aws.Endpoint{
|
|
||||||
URL: s.config.GetString("core.storage.s3.endpoint"),
|
|
||||||
SigningRegion: s.config.GetString("core.storage.s3.region"),
|
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
return aws.Endpoint{}, &aws.EndpointNotFoundError{}
|
|
||||||
})
|
|
||||||
|
|
||||||
cfg, err := config.LoadDefaultConfig(context.TODO(),
|
err = s.renter.UploadObject(ctx, reader, protocolName, filename)
|
||||||
config.WithRegion("us-east-1"),
|
|
||||||
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
|
|
||||||
s.config.GetString("core.storage.s3.accessKey"),
|
|
||||||
s.config.GetString("core.storage.s3.secretKey"),
|
|
||||||
"",
|
|
||||||
)),
|
|
||||||
config.WithEndpointResolverWithOptions(customResolver),
|
|
||||||
)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, nil
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
s3Client := s3.NewFromConfig(cfg)
|
return nil, nil
|
||||||
|
|
||||||
store := s3store.New(s.config.GetString("core.storage.s3.bufferBucket"), s3Client)
|
|
||||||
|
|
||||||
locker := NewMySQLLocker(s.db, s.logger)
|
|
||||||
|
|
||||||
composer := tusd.NewStoreComposer()
|
|
||||||
store.UseIn(composer)
|
|
||||||
composer.UseLocker(locker)
|
|
||||||
|
|
||||||
handler, err := tusd.NewHandler(tusd.Config{
|
|
||||||
BasePath: basePath,
|
|
||||||
StoreComposer: composer,
|
|
||||||
DisableDownload: true,
|
|
||||||
NotifyCompleteUploads: true,
|
|
||||||
NotifyTerminatedUploads: true,
|
|
||||||
NotifyCreatedUploads: true,
|
|
||||||
RespectForwardedHeaders: true,
|
|
||||||
PreUploadCreateCallback: preUploadCb,
|
|
||||||
})
|
|
||||||
|
|
||||||
return handler, store, s3Client, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *StorageServiceDefault) init() error {
|
func (s StorageServiceDefault) UploadObjectProof(ctx context.Context, protocol StorageProtocol, data io.ReadSeeker, proof *bao.Result) error {
|
||||||
preUpload := func(hook tusd.HookEvent) (tusd.HTTPResponse, tusd.FileInfoChanges, error) {
|
if proof == nil {
|
||||||
blankResp := tusd.HTTPResponse{}
|
hashResult, err := s.HashObject(ctx, data)
|
||||||
blankChanges := tusd.FileInfoChanges{}
|
|
||||||
|
|
||||||
hash, ok := hook.Upload.MetaData["hash"]
|
|
||||||
if !ok {
|
|
||||||
return blankResp, blankChanges, errors.New("missing hash")
|
|
||||||
}
|
|
||||||
|
|
||||||
decodedHash, err := encoding.MultihashFromBase64Url(hash)
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return blankResp, blankChanges, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
exists, _ := s.FileExists(decodedHash.HashBytes())
|
proof = hashResult
|
||||||
|
|
||||||
if exists {
|
|
||||||
return blankResp, blankChanges, errors.New("file already exists")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
exists, _ = s.TusUploadExists(decodedHash.HashBytes())
|
protocolName := protocol.Name()
|
||||||
|
|
||||||
if exists {
|
err := s.renter.CreateBucketIfNotExists(protocolName)
|
||||||
return blankResp, blankChanges, errors.New("file is already being uploaded")
|
|
||||||
}
|
|
||||||
|
|
||||||
return blankResp, blankChanges, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
tus, store, s3client, err := s.BuildUploadBufferTus("/s5/upload/tus", preUpload, nil)
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
s.tus = tus
|
return s.renter.UploadObject(ctx, bytes.NewReader(proof.Proof), protocolName, s.getProofPath(protocol, proof.Hash))
|
||||||
s.tusStore = store
|
|
||||||
s.s3Client = s3client
|
|
||||||
|
|
||||||
s.cron.RegisterService(s)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (s *StorageServiceDefault) LoadInitialTasks(cron cron.CronService) error {
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *StorageServiceDefault) FileExists(hash []byte) (bool, *models.Upload) {
|
func (s StorageServiceDefault) HashObject(ctx context.Context, data io.Reader) (*bao.Result, error) {
|
||||||
hashStr := hex.EncodeToString(hash)
|
result, err := bao.Hash(data)
|
||||||
|
|
||||||
var upload models.Upload
|
if err != nil {
|
||||||
result := s.db.Model(&models.Upload{}).Where(&models.Upload{Hash: hashStr}).First(&upload)
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
return result.RowsAffected > 0, &upload
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *StorageServiceDefault) GetHashSmall(file io.ReadSeeker) (*bao.Result, int, error) {
|
func (s StorageServiceDefault) DownloadObject(ctx context.Context, protocol StorageProtocol, objectHash []byte, start int64) (io.ReadCloser, error) {
|
||||||
buf := bytes.NewBuffer(nil)
|
|
||||||
|
|
||||||
_, err := io.Copy(buf, file)
|
|
||||||
if err != nil {
|
|
||||||
return nil, 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
result, _, err := bao.Hash(buf)
|
|
||||||
if err != nil {
|
|
||||||
return nil, 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return result, buf.Len(), nil
|
|
||||||
}
|
|
||||||
func (s *StorageServiceDefault) GetHash(file io.Reader) (*bao.Result, int, error) {
|
|
||||||
hash, totalBytes, err := bao.Hash(file)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return nil, 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return hash, totalBytes, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *StorageServiceDefault) CreateUpload(hash []byte, mime string, uploaderID uint, uploaderIP string, size uint64, protocol string) (*models.Upload, error) {
|
|
||||||
hashStr := hex.EncodeToString(hash)
|
|
||||||
|
|
||||||
upload := &models.Upload{
|
|
||||||
Hash: hashStr,
|
|
||||||
MimeType: mime,
|
|
||||||
UserID: uploaderID,
|
|
||||||
UploaderIP: uploaderIP,
|
|
||||||
Protocol: protocol,
|
|
||||||
Size: size,
|
|
||||||
}
|
|
||||||
|
|
||||||
result := s.db.Create(upload)
|
|
||||||
|
|
||||||
if result.Error != nil {
|
|
||||||
return nil, result.Error
|
|
||||||
}
|
|
||||||
|
|
||||||
return upload, nil
|
|
||||||
}
|
|
||||||
func (s *StorageServiceDefault) tusWorker() {
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case info := <-s.tus.CreatedUploads:
|
|
||||||
hash, ok := info.Upload.MetaData["hash"]
|
|
||||||
errorResponse := tusd.HTTPResponse{StatusCode: 400, Header: nil}
|
|
||||||
if !ok {
|
|
||||||
s.logger.Error("Missing hash in metadata")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
uploaderID, ok := info.Context.Value(middleware.DEFAULT_AUTH_CONTEXT_KEY).(uint64)
|
|
||||||
if !ok {
|
|
||||||
errorResponse.Body = "Missing user id in context"
|
|
||||||
info.Upload.StopUpload(errorResponse)
|
|
||||||
s.logger.Error("Missing user id in context")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
uploaderIP := info.HTTPRequest.RemoteAddr
|
|
||||||
|
|
||||||
decodedHash, err := encoding.MultihashFromBase64Url(hash)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
errorResponse.Body = "Could not decode hash"
|
|
||||||
info.Upload.StopUpload(errorResponse)
|
|
||||||
s.logger.Error("Could not decode hash", zap.Error(err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = s.CreateTusUpload(decodedHash.HashBytes(), info.Upload.ID, uint(uploaderID), uploaderIP, info.Context.Value("protocol").(string))
|
|
||||||
if err != nil {
|
|
||||||
errorResponse.Body = "Could not create tus upload"
|
|
||||||
info.Upload.StopUpload(errorResponse)
|
|
||||||
s.logger.Error("Could not create tus upload", zap.Error(err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
case info := <-s.tus.UploadProgress:
|
|
||||||
err := s.TusUploadProgress(info.Upload.ID)
|
|
||||||
if err != nil {
|
|
||||||
s.logger.Error("Could not update tus upload", zap.Error(err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
case info := <-s.tus.TerminatedUploads:
|
|
||||||
err := s.DeleteTusUpload(info.Upload.ID)
|
|
||||||
if err != nil {
|
|
||||||
s.logger.Error("Could not delete tus upload", zap.Error(err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
case info := <-s.tus.CompleteUploads:
|
|
||||||
if !(!info.Upload.SizeIsDeferred && info.Upload.Offset == info.Upload.Size) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
err := s.TusUploadCompleted(info.Upload.ID)
|
|
||||||
if err != nil {
|
|
||||||
s.logger.Error("Could not complete tus upload", zap.Error(err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
err = s.ScheduleTusUpload(info.Upload.ID)
|
|
||||||
if err != nil {
|
|
||||||
s.logger.Error("Could not schedule tus upload", zap.Error(err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *StorageServiceDefault) TusUploadExists(hash []byte) (bool, models.TusUpload) {
|
|
||||||
hashStr := hex.EncodeToString(hash)
|
|
||||||
|
|
||||||
var upload models.TusUpload
|
|
||||||
result := s.db.Model(&models.TusUpload{}).Where(&models.TusUpload{Hash: hashStr}).First(&upload)
|
|
||||||
|
|
||||||
return result.RowsAffected > 0, upload
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *StorageServiceDefault) CreateTusUpload(hash []byte, uploadID string, uploaderID uint, uploaderIP string, protocol string) (*models.TusUpload, error) {
|
|
||||||
hashStr := hex.EncodeToString(hash)
|
|
||||||
|
|
||||||
upload := &models.TusUpload{
|
|
||||||
Hash: hashStr,
|
|
||||||
UploadID: uploadID,
|
|
||||||
UploaderID: uploaderID,
|
|
||||||
UploaderIP: uploaderIP,
|
|
||||||
Uploader: models.User{},
|
|
||||||
Protocol: protocol,
|
|
||||||
}
|
|
||||||
|
|
||||||
result := s.db.Create(upload)
|
|
||||||
|
|
||||||
if result.Error != nil {
|
|
||||||
return nil, result.Error
|
|
||||||
}
|
|
||||||
|
|
||||||
return upload, nil
|
|
||||||
}
|
|
||||||
func (s *StorageServiceDefault) TusUploadProgress(uploadID string) error {
|
|
||||||
|
|
||||||
find := &models.TusUpload{UploadID: uploadID}
|
|
||||||
|
|
||||||
var upload models.TusUpload
|
|
||||||
result := s.db.Model(&models.TusUpload{}).Where(find).First(&upload)
|
|
||||||
|
|
||||||
if result.RowsAffected == 0 {
|
|
||||||
return errors.New("upload not found")
|
|
||||||
}
|
|
||||||
|
|
||||||
result = s.db.Model(&models.TusUpload{}).Where(find).Update("updated_at", time.Now())
|
|
||||||
|
|
||||||
if result.Error != nil {
|
|
||||||
return result.Error
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (s *StorageServiceDefault) TusUploadCompleted(uploadID string) error {
|
|
||||||
|
|
||||||
find := &models.TusUpload{UploadID: uploadID}
|
|
||||||
|
|
||||||
var upload models.TusUpload
|
|
||||||
result := s.db.Model(&models.TusUpload{}).Where(find).First(&upload)
|
|
||||||
|
|
||||||
if result.RowsAffected == 0 {
|
|
||||||
return errors.New("upload not found")
|
|
||||||
}
|
|
||||||
|
|
||||||
result = s.db.Model(&models.TusUpload{}).Where(find).Update("completed", true)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (s *StorageServiceDefault) DeleteTusUpload(uploadID string) error {
|
|
||||||
result := s.db.Where(&models.TusUpload{UploadID: uploadID}).Delete(&models.TusUpload{})
|
|
||||||
|
|
||||||
if result.Error != nil {
|
|
||||||
return result.Error
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *StorageServiceDefault) ScheduleTusUpload(uploadID string) error {
|
|
||||||
find := &models.TusUpload{UploadID: uploadID}
|
|
||||||
|
|
||||||
var upload models.TusUpload
|
|
||||||
result := s.db.Model(&models.TusUpload{}).Where(find).First(&upload)
|
|
||||||
|
|
||||||
if result.RowsAffected == 0 {
|
|
||||||
return errors.New("upload not found")
|
|
||||||
}
|
|
||||||
|
|
||||||
task := s.cron.RetryableTask(cron.RetryableTaskParams{
|
|
||||||
Name: "tusUpload",
|
|
||||||
Function: s.tusUploadTask,
|
|
||||||
Args: []interface{}{&upload},
|
|
||||||
Attempt: 0,
|
|
||||||
Limit: 0,
|
|
||||||
After: func(jobID uuid.UUID, jobName string) {
|
|
||||||
s.logger.Info("Job finished", zap.String("jobName", jobName), zap.String("uploadID", uploadID))
|
|
||||||
err := s.DeleteTusUpload(uploadID)
|
|
||||||
if err != nil {
|
|
||||||
s.logger.Error("Error deleting tus upload", zap.Error(err))
|
|
||||||
}
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
_, err := s.cron.CreateJob(task)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *StorageServiceDefault) tusUploadTask(upload *models.TusUpload) error {
|
|
||||||
ctx := context.Background()
|
|
||||||
tusUpload, err := s.tusStore.GetUpload(ctx, upload.UploadID)
|
|
||||||
if err != nil {
|
|
||||||
s.logger.Error("Could not get upload", zap.Error(err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
readerHash, err := tusUpload.GetReader(ctx)
|
|
||||||
if err != nil {
|
|
||||||
s.logger.Error("Could not get tus file", zap.Error(err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
defer func(reader io.ReadCloser) {
|
|
||||||
err := reader.Close()
|
|
||||||
if err != nil {
|
|
||||||
s.logger.Error("Could not close reader", zap.Error(err))
|
|
||||||
}
|
|
||||||
}(readerHash)
|
|
||||||
|
|
||||||
hash, byteCount, err := s.GetHash(readerHash)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
s.logger.Error("Could not compute hash", zap.Error(err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
dbHash, err := hex.DecodeString(upload.Hash)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
s.logger.Error("Could not decode hash", zap.Error(err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if !bytes.Equal(hash.Hash, dbHash) {
|
|
||||||
s.logger.Error("Hashes do not match", zap.Any("upload", upload), zap.Any("hash", hash), zap.Any("dbHash", dbHash))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
readerMime, err := tusUpload.GetReader(ctx)
|
|
||||||
if err != nil {
|
|
||||||
s.logger.Error("Could not get tus file", zap.Error(err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
defer func(reader io.ReadCloser) {
|
|
||||||
err := reader.Close()
|
|
||||||
if err != nil {
|
|
||||||
s.logger.Error("Could not close reader", zap.Error(err))
|
|
||||||
}
|
|
||||||
}(readerMime)
|
|
||||||
|
|
||||||
var mimeBuf [512]byte
|
|
||||||
|
|
||||||
_, err = readerMime.Read(mimeBuf[:])
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
s.logger.Error("Could not read mime", zap.Error(err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
mimeType := http.DetectContentType(mimeBuf[:])
|
|
||||||
|
|
||||||
upload.MimeType = mimeType
|
|
||||||
|
|
||||||
if tx := s.db.Save(upload); tx.Error != nil {
|
|
||||||
s.logger.Error("Could not update tus upload", zap.Error(tx.Error))
|
|
||||||
return tx.Error
|
|
||||||
}
|
|
||||||
|
|
||||||
hashStr, err := encoding.NewMultihash(s.getPrefixedHash(hash.Hash)).ToBase64Url()
|
|
||||||
err = s.renter.CreateBucketIfNotExists(upload.Protocol)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
info, err := tusUpload.GetInfo(context.Background())
|
|
||||||
if err != nil {
|
|
||||||
s.logger.Error("Could not get tus info", zap.Error(err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = s.renter.MultipartUpload(renter.MultiPartUploadParams{
|
|
||||||
ReaderFactory: func(start uint, end uint) (io.ReadCloser, error) {
|
|
||||||
rangeHeader := fmt.Sprintf("bytes=%d-%d", start, end)
|
|
||||||
ctx = context.WithValue(ctx, "range", rangeHeader)
|
|
||||||
return tusUpload.GetReader(ctx)
|
|
||||||
},
|
|
||||||
Bucket: upload.Protocol,
|
|
||||||
FileName: "/" + hashStr,
|
|
||||||
Size: uint64(info.Size),
|
|
||||||
})
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
s.logger.Error("Could not upload file", zap.Error(err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = s.renter.UploadObject(context.Background(), bytes.NewReader(hash.Proof), upload.Protocol, fmt.Sprintf("%s.bao", hashStr))
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
s3InfoId, _ := splitS3Ids(upload.UploadID)
|
|
||||||
|
|
||||||
_, err = s.s3Client.DeleteObjects(ctx, &s3.DeleteObjectsInput{
|
|
||||||
Bucket: aws.String(s.config.GetString("core.storage.s3.bufferBucket")),
|
|
||||||
Delete: &s3types.Delete{
|
|
||||||
Objects: []s3types.ObjectIdentifier{
|
|
||||||
{
|
|
||||||
Key: aws.String(s3InfoId),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Key: aws.String(s3InfoId + ".info"),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Quiet: aws.Bool(true),
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
s.logger.Error("Could not delete upload metadata", zap.Error(err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
newUpload, err := s.CreateUpload(dbHash, mimeType, upload.UploaderID, upload.UploaderIP, uint64(byteCount), upload.Protocol)
|
|
||||||
if err != nil {
|
|
||||||
s.logger.Error("Could not create upload", zap.Error(err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = s.accounts.PinByID(newUpload.ID, upload.UploaderID)
|
|
||||||
if err != nil {
|
|
||||||
s.logger.Error("Could not pin upload", zap.Error(err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *StorageServiceDefault) getPrefixedHash(hash []byte) []byte {
|
|
||||||
return append([]byte{byte(types.HashTypeBlake3)}, hash...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func splitS3Ids(id string) (objectId, multipartId string) {
|
|
||||||
index := strings.Index(id, "+")
|
|
||||||
if index == -1 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
objectId = id[:index]
|
|
||||||
multipartId = id[index+1:]
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *StorageServiceDefault) GetFile(hash []byte, start int64) (io.ReadCloser, int64, error) {
|
|
||||||
if exists, tusUpload := s.TusUploadExists(hash); exists {
|
|
||||||
if tusUpload.Completed {
|
|
||||||
upload, err := s.tusStore.GetUpload(context.Background(), tusUpload.UploadID)
|
|
||||||
if err != nil {
|
|
||||||
return nil, 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
info, _ := upload.GetInfo(context.Background())
|
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
if start > 0 {
|
|
||||||
endPosition := start + info.Size - 1
|
|
||||||
rangeHeader := fmt.Sprintf("bytes=%d-%d", start, endPosition)
|
|
||||||
ctx = context.WithValue(ctx, "range", rangeHeader)
|
|
||||||
}
|
|
||||||
|
|
||||||
reader, err := upload.GetReader(ctx)
|
|
||||||
|
|
||||||
return reader, info.Size, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
exists, upload := s.FileExists(hash)
|
|
||||||
|
|
||||||
if !exists {
|
|
||||||
return nil, 0, errors.New("file does not exist")
|
|
||||||
}
|
|
||||||
|
|
||||||
hashStr, err := encoding.NewMultihash(s.getPrefixedHash(hash)).ToBase64Url()
|
|
||||||
if err != nil {
|
|
||||||
return nil, 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
var partialRange api.DownloadRange
|
var partialRange api.DownloadRange
|
||||||
|
|
||||||
|
upload, err := s.metadata.GetUpload(ctx, objectHash)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
if start > 0 {
|
if start > 0 {
|
||||||
partialRange = api.DownloadRange{
|
partialRange = api.DownloadRange{
|
||||||
Offset: start,
|
Offset: start,
|
||||||
|
@ -690,20 +233,41 @@ func (s *StorageServiceDefault) GetFile(hash []byte, start int64) (io.ReadCloser
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
object, err := s.renter.GetObject(context.Background(), upload.Protocol, hashStr, api.DownloadObjectOptions{
|
object, err := s.renter.GetObject(ctx, protocol.Name(), protocol.EncodeFileName(objectHash), api.DownloadObjectOptions{Range: partialRange})
|
||||||
Range: partialRange,
|
|
||||||
})
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return object.Content, int64(upload.Size), nil
|
return object.Content, nil
|
||||||
}
|
}
|
||||||
func (s *StorageServiceDefault) NewFile(hash []byte) *FileImpl {
|
|
||||||
return NewFile(FileParams{
|
func (s StorageServiceDefault) DownloadObjectProof(ctx context.Context, protocol StorageProtocol, objectHash []byte) (io.ReadCloser, error) {
|
||||||
Storage: s,
|
object, err := s.renter.GetObject(ctx, protocol.Name(), protocol.EncodeFileName(objectHash)+".bao", api.DownloadObjectOptions{})
|
||||||
Renter: s.renter,
|
if err != nil {
|
||||||
Hash: hash,
|
return nil, err
|
||||||
})
|
}
|
||||||
|
|
||||||
|
return object.Content, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s StorageServiceDefault) DeleteObject(ctx context.Context, protocol StorageProtocol, objectHash []byte) error {
|
||||||
|
err := s.renter.DeleteObject(ctx, protocol.Name(), protocol.EncodeFileName(objectHash))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s StorageServiceDefault) DeleteObjectProof(ctx context.Context, protocol StorageProtocol, objectHash []byte) error {
|
||||||
|
err := s.renter.DeleteObject(ctx, protocol.Name(), s.getProofPath(protocol, objectHash))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s StorageServiceDefault) getProofPath(protocol StorageProtocol, objectHash []byte) string {
|
||||||
|
return fmt.Sprintf("%s/%s.%s", protocol.Name(), protocol.EncodeFileName(objectHash), PROOF_EXTENSION)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue