feat: implement adding bao hashing and uploading proofs
This commit is contained in:
parent
2a1abb852b
commit
c790c525ae
|
@ -10,6 +10,7 @@ import (
|
||||||
"git.lumeweb.com/LumeWeb/libs5-go/types"
|
"git.lumeweb.com/LumeWeb/libs5-go/types"
|
||||||
"git.lumeweb.com/LumeWeb/portal/account"
|
"git.lumeweb.com/LumeWeb/portal/account"
|
||||||
"git.lumeweb.com/LumeWeb/portal/api/middleware"
|
"git.lumeweb.com/LumeWeb/portal/api/middleware"
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/bao"
|
||||||
"git.lumeweb.com/LumeWeb/portal/cron"
|
"git.lumeweb.com/LumeWeb/portal/cron"
|
||||||
"git.lumeweb.com/LumeWeb/portal/db/models"
|
"git.lumeweb.com/LumeWeb/portal/db/models"
|
||||||
"git.lumeweb.com/LumeWeb/portal/renter"
|
"git.lumeweb.com/LumeWeb/portal/renter"
|
||||||
|
@ -27,7 +28,6 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"gorm.io/gorm"
|
"gorm.io/gorm"
|
||||||
"io"
|
"io"
|
||||||
"lukechampine.com/blake3"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
@ -95,9 +95,9 @@ func NewStorageService(lc fx.Lifecycle, params StorageServiceParams) *StorageSer
|
||||||
return ss
|
return ss
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s StorageServiceDefault) PutFileSmall(file io.ReadSeeker, bucket string, generateProof bool) ([]byte, error) {
|
func (s StorageServiceDefault) PutFileSmall(file io.ReadSeeker, bucket string) ([]byte, error) {
|
||||||
hash, err := s.GetHashSmall(file)
|
hash, err := s.GetHashSmall(file)
|
||||||
hashStr, err := encoding.NewMultihash(s.getPrefixedHash(hash)).ToBase64Url()
|
hashStr, err := encoding.NewMultihash(s.getPrefixedHash(hash.Hash)).ToBase64Url()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -118,21 +118,13 @@ func (s StorageServiceDefault) PutFileSmall(file io.ReadSeeker, bucket string, g
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return hash[:], nil
|
err = s.renter.UploadObject(context.Background(), bytes.NewReader(hash.Proof), bucket, fmt.Sprintf("%s.bao", hashStr))
|
||||||
}
|
|
||||||
func (s StorageServiceDefault) PutFile(file io.Reader, bucket string, hash []byte) error {
|
|
||||||
hashStr, err := encoding.NewMultihash(s.getPrefixedHash(hash)).ToBase64Url()
|
|
||||||
err = s.renter.CreateBucketIfNotExists(bucket)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = s.renter.UploadObject(context.Background(), file, bucket, hashStr)
|
return hash.Hash[:], nil
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *StorageServiceDefault) BuildUploadBufferTus(basePath string, preUploadCb TusPreUploadCreateCallback, preFinishCb TusPreFinishResponseCallback) (*tusd.Handler, tusd.DataStore, *s3.Client, error) {
|
func (s *StorageServiceDefault) BuildUploadBufferTus(basePath string, preUploadCb TusPreUploadCreateCallback, preFinishCb TusPreFinishResponseCallback) (*tusd.Handler, tusd.DataStore, *s3.Client, error) {
|
||||||
|
@ -241,7 +233,7 @@ func (s *StorageServiceDefault) FileExists(hash []byte) (bool, models.Upload) {
|
||||||
return result.RowsAffected > 0, upload
|
return result.RowsAffected > 0, upload
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *StorageServiceDefault) GetHashSmall(file io.ReadSeeker) ([]byte, error) {
|
func (s *StorageServiceDefault) GetHashSmall(file io.ReadSeeker) (*bao.Result, error) {
|
||||||
buf := bytes.NewBuffer(nil)
|
buf := bytes.NewBuffer(nil)
|
||||||
|
|
||||||
_, err := io.Copy(buf, file)
|
_, err := io.Copy(buf, file)
|
||||||
|
@ -249,22 +241,21 @@ func (s *StorageServiceDefault) GetHashSmall(file io.ReadSeeker) ([]byte, error)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
hash := blake3.Sum256(buf.Bytes())
|
result, _, err := bao.Hash(buf)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
return hash[:], nil
|
return result, nil
|
||||||
}
|
}
|
||||||
func (s *StorageServiceDefault) GetHash(file io.Reader) ([]byte, int64, error) {
|
func (s *StorageServiceDefault) GetHash(file io.Reader) (*bao.Result, int, error) {
|
||||||
hasher := blake3.New(64, nil)
|
hash, totalBytes, err := bao.Hash(file)
|
||||||
|
|
||||||
totalBytes, err := io.Copy(hasher, file)
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, err
|
return nil, 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
hash := hasher.Sum(nil)
|
return hash, totalBytes, nil
|
||||||
|
|
||||||
return hash[:32], totalBytes, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *StorageServiceDefault) CreateUpload(hash []byte, mime string, uploaderID uint, uploaderIP string, size uint64, protocol string) (*models.Upload, error) {
|
func (s *StorageServiceDefault) CreateUpload(hash []byte, mime string, uploaderID uint, uploaderIP string, size uint64, protocol string) (*models.Upload, error) {
|
||||||
|
@ -498,7 +489,7 @@ func (s *StorageServiceDefault) tusUploadTask(upload *models.TusUpload) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if !bytes.Equal(hash, dbHash) {
|
if !bytes.Equal(hash.Hash, dbHash) {
|
||||||
s.logger.Error("Hashes do not match", zap.Any("upload", upload), zap.Any("hash", hash), zap.Any("dbHash", dbHash))
|
s.logger.Error("Hashes do not match", zap.Any("upload", upload), zap.Any("hash", hash), zap.Any("dbHash", dbHash))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -534,7 +525,7 @@ func (s *StorageServiceDefault) tusUploadTask(upload *models.TusUpload) error {
|
||||||
return tx.Error
|
return tx.Error
|
||||||
}
|
}
|
||||||
|
|
||||||
hashStr, err := encoding.NewMultihash(s.getPrefixedHash(hash)).ToBase64Url()
|
hashStr, err := encoding.NewMultihash(s.getPrefixedHash(hash.Hash)).ToBase64Url()
|
||||||
err = s.renter.CreateBucketIfNotExists(upload.Protocol)
|
err = s.renter.CreateBucketIfNotExists(upload.Protocol)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -562,6 +553,12 @@ func (s *StorageServiceDefault) tusUploadTask(upload *models.TusUpload) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = s.renter.UploadObject(context.Background(), bytes.NewReader(hash.Proof), upload.Protocol, fmt.Sprintf("%s.bao", hashStr))
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
s3InfoId, _ := splitS3Ids(upload.UploadID)
|
s3InfoId, _ := splitS3Ids(upload.UploadID)
|
||||||
|
|
||||||
_, err = s.s3Client.DeleteObjects(ctx, &s3.DeleteObjectsInput{
|
_, err = s.s3Client.DeleteObjects(ctx, &s3.DeleteObjectsInput{
|
||||||
|
|
Loading…
Reference in New Issue