Compare commits
11 Commits
4b712a3a80
...
26042b62ac
Author | SHA1 | Date |
---|---|---|
Derrick Hammer | 26042b62ac | |
Derrick Hammer | 038d2c440b | |
Derrick Hammer | 96ac75bf3f | |
Derrick Hammer | 56d61895f5 | |
Derrick Hammer | 89ef950432 | |
Derrick Hammer | 396b3f60a8 | |
Derrick Hammer | e7d1bd0f09 | |
Derrick Hammer | e8c232dfdd | |
Derrick Hammer | 39936b3b14 | |
Derrick Hammer | 7845f95776 | |
Derrick Hammer | 6d5b9d880b |
|
@ -5,8 +5,8 @@ import (
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"errors"
|
"errors"
|
||||||
"git.lumeweb.com/LumeWeb/portal/db"
|
"git.lumeweb.com/LumeWeb/portal/db"
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/logger"
|
||||||
"git.lumeweb.com/LumeWeb/portal/model"
|
"git.lumeweb.com/LumeWeb/portal/model"
|
||||||
"git.lumeweb.com/LumeWeb/portal/shared"
|
|
||||||
_validator "git.lumeweb.com/LumeWeb/portal/validator"
|
_validator "git.lumeweb.com/LumeWeb/portal/validator"
|
||||||
"github.com/go-playground/validator/v10"
|
"github.com/go-playground/validator/v10"
|
||||||
"github.com/kataras/iris/v12"
|
"github.com/kataras/iris/v12"
|
||||||
|
@ -59,7 +59,7 @@ func hashPassword(password string) (string, error) {
|
||||||
// Generate a new bcrypt hash from the provided password.
|
// Generate a new bcrypt hash from the provided password.
|
||||||
hashedPassword, err := bcrypt.GenerateFromPassword([]byte(password), bcrypt.DefaultCost)
|
hashedPassword, err := bcrypt.GenerateFromPassword([]byte(password), bcrypt.DefaultCost)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
shared.GetLogger().Error("failed to hash password", zap.Error(err))
|
logger.Get().Error("failed to hash password", zap.Error(err))
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -71,7 +71,7 @@ func (a *AccountController) PostRegister() {
|
||||||
var r RegisterRequest
|
var r RegisterRequest
|
||||||
|
|
||||||
if err := a.Ctx.ReadJSON(&r); err != nil {
|
if err := a.Ctx.ReadJSON(&r); err != nil {
|
||||||
shared.GetLogger().Debug("failed to parse request", zap.Error(err))
|
logger.Get().Debug("failed to parse request", zap.Error(err))
|
||||||
a.Ctx.StopWithError(iris.StatusBadRequest, err)
|
a.Ctx.StopWithError(iris.StatusBadRequest, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -80,13 +80,13 @@ func (a *AccountController) PostRegister() {
|
||||||
existingAccount := model.Account{}
|
existingAccount := model.Account{}
|
||||||
err := db.Get().Where("email = ?", r.Email).First(&existingAccount).Error
|
err := db.Get().Where("email = ?", r.Email).First(&existingAccount).Error
|
||||||
if err == nil {
|
if err == nil {
|
||||||
shared.GetLogger().Debug("account with email already exists", zap.Error(err), zap.String("email", r.Email))
|
logger.Get().Debug("account with email already exists", zap.Error(err), zap.String("email", r.Email))
|
||||||
// An account with the same email address already exists.
|
// An account with the same email address already exists.
|
||||||
// Return an error response to the client.
|
// Return an error response to the client.
|
||||||
a.Ctx.StopWithError(iris.StatusConflict, errors.New("an account with this email address already exists"))
|
a.Ctx.StopWithError(iris.StatusConflict, errors.New("an account with this email address already exists"))
|
||||||
return
|
return
|
||||||
} else if !errors.Is(err, gorm.ErrRecordNotFound) {
|
} else if !errors.Is(err, gorm.ErrRecordNotFound) {
|
||||||
shared.GetLogger().Error("error querying accounts", zap.Error(err), zap.String("email", r.Email))
|
logger.Get().Error("error querying accounts", zap.Error(err), zap.String("email", r.Email))
|
||||||
// An unexpected error occurred while querying the database.
|
// An unexpected error occurred while querying the database.
|
||||||
// Return an error response to the client.
|
// Return an error response to the client.
|
||||||
a.Ctx.StopWithError(iris.StatusInternalServerError, err)
|
a.Ctx.StopWithError(iris.StatusInternalServerError, err)
|
||||||
|
@ -125,7 +125,7 @@ func (a *AccountController) PostRegister() {
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
shared.GetLogger().Error("failed to create account", zap.Error(err))
|
logger.Get().Error("failed to create account", zap.Error(err))
|
||||||
a.Ctx.StopWithError(iris.StatusInternalServerError, err)
|
a.Ctx.StopWithError(iris.StatusInternalServerError, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,8 +6,8 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"git.lumeweb.com/LumeWeb/portal/db"
|
"git.lumeweb.com/LumeWeb/portal/db"
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/logger"
|
||||||
"git.lumeweb.com/LumeWeb/portal/model"
|
"git.lumeweb.com/LumeWeb/portal/model"
|
||||||
"git.lumeweb.com/LumeWeb/portal/shared"
|
|
||||||
"github.com/joomcode/errorx"
|
"github.com/joomcode/errorx"
|
||||||
"github.com/kataras/iris/v12"
|
"github.com/kataras/iris/v12"
|
||||||
"github.com/kataras/jwt"
|
"github.com/kataras/jwt"
|
||||||
|
@ -75,7 +75,7 @@ func generateToken(maxAge time.Duration) (string, error) {
|
||||||
token, err := jwt.Sign(jwt.HS256, sharedKey, claim, jwt.MaxAge(maxAge))
|
token, err := jwt.Sign(jwt.HS256, sharedKey, claim, jwt.MaxAge(maxAge))
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
shared.GetLogger().Error("failed to sign jwt", zap.Error(err))
|
logger.Get().Error("failed to sign jwt", zap.Error(err))
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -86,7 +86,7 @@ func generateAndSaveLoginToken(accountID uint, maxAge time.Duration) (string, er
|
||||||
// Generate a JWT token for the authenticated user.
|
// Generate a JWT token for the authenticated user.
|
||||||
token, err := generateToken(maxAge)
|
token, err := generateToken(maxAge)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
shared.GetLogger().Error("failed to generate token", zap.Error(err))
|
logger.Get().Error("failed to generate token", zap.Error(err))
|
||||||
return "", fmt.Errorf("failed to generate token: %s", err)
|
return "", fmt.Errorf("failed to generate token: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -104,7 +104,7 @@ func generateAndSaveLoginToken(accountID uint, maxAge time.Duration) (string, er
|
||||||
|
|
||||||
if err := db.Get().Create(&session).Error; err != nil {
|
if err := db.Get().Create(&session).Error; err != nil {
|
||||||
msg := "failed to save token"
|
msg := "failed to save token"
|
||||||
shared.GetLogger().Error(msg, zap.Error(err))
|
logger.Get().Error(msg, zap.Error(err))
|
||||||
return "", errorx.Decorate(err, msg)
|
return "", errorx.Decorate(err, msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -115,7 +115,7 @@ func generateAndSaveChallengeToken(accountID uint, maxAge time.Duration) (string
|
||||||
// Generate a JWT token for the authenticated user.
|
// Generate a JWT token for the authenticated user.
|
||||||
token, err := generateToken(maxAge)
|
token, err := generateToken(maxAge)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
shared.GetLogger().Error("failed to generate token", zap.Error(err))
|
logger.Get().Error("failed to generate token", zap.Error(err))
|
||||||
return "", fmt.Errorf("failed to generate token: %s", err)
|
return "", fmt.Errorf("failed to generate token: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -133,7 +133,7 @@ func generateAndSaveChallengeToken(accountID uint, maxAge time.Duration) (string
|
||||||
|
|
||||||
if err := db.Get().Create(&keyChallenge).Error; err != nil {
|
if err := db.Get().Create(&keyChallenge).Error; err != nil {
|
||||||
msg := "failed to save token"
|
msg := "failed to save token"
|
||||||
shared.GetLogger().Error(msg, zap.Error(err))
|
logger.Get().Error(msg, zap.Error(err))
|
||||||
return "", errorx.Decorate(err, msg)
|
return "", errorx.Decorate(err, msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -146,7 +146,7 @@ func (a *AuthController) PostLogin() {
|
||||||
|
|
||||||
// Read the login request from the client.
|
// Read the login request from the client.
|
||||||
if err := a.Ctx.ReadJSON(&r); err != nil {
|
if err := a.Ctx.ReadJSON(&r); err != nil {
|
||||||
shared.GetLogger().Debug("failed to parse request", zap.Error(err))
|
logger.Get().Debug("failed to parse request", zap.Error(err))
|
||||||
a.Ctx.StopWithError(iris.StatusBadRequest, err)
|
a.Ctx.StopWithError(iris.StatusBadRequest, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -155,7 +155,7 @@ func (a *AuthController) PostLogin() {
|
||||||
account := model.Account{}
|
account := model.Account{}
|
||||||
if err := db.Get().Where("email = ?", r.Email).First(&account).Error; err != nil {
|
if err := db.Get().Where("email = ?", r.Email).First(&account).Error; err != nil {
|
||||||
msg := "invalid email or password"
|
msg := "invalid email or password"
|
||||||
shared.GetLogger().Debug(msg, zap.Error(err))
|
logger.Get().Debug(msg, zap.Error(err))
|
||||||
a.Ctx.StopWithError(iris.StatusBadRequest, errors.New(msg))
|
a.Ctx.StopWithError(iris.StatusBadRequest, errors.New(msg))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -163,7 +163,7 @@ func (a *AuthController) PostLogin() {
|
||||||
// Verify the provided password against the hashed password stored in the database.
|
// Verify the provided password against the hashed password stored in the database.
|
||||||
if err := verifyPassword(*account.Password, r.Password); err != nil {
|
if err := verifyPassword(*account.Password, r.Password); err != nil {
|
||||||
msg := "invalid email or password"
|
msg := "invalid email or password"
|
||||||
shared.GetLogger().Debug(msg, zap.Error(err))
|
logger.Get().Debug(msg, zap.Error(err))
|
||||||
a.Ctx.StopWithError(iris.StatusBadRequest, errors.New(msg))
|
a.Ctx.StopWithError(iris.StatusBadRequest, errors.New(msg))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -171,7 +171,7 @@ func (a *AuthController) PostLogin() {
|
||||||
// Generate a JWT token for the authenticated user.
|
// Generate a JWT token for the authenticated user.
|
||||||
token, err := generateAndSaveLoginToken(account.ID, 24*time.Hour)
|
token, err := generateAndSaveLoginToken(account.ID, 24*time.Hour)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
shared.GetLogger().Debug("failed to generate token", zap.Error(err))
|
logger.Get().Debug("failed to generate token", zap.Error(err))
|
||||||
a.Ctx.StopWithError(iris.StatusInternalServerError, fmt.Errorf("failed to generate token: %s", err))
|
a.Ctx.StopWithError(iris.StatusInternalServerError, fmt.Errorf("failed to generate token: %s", err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -179,7 +179,7 @@ func (a *AuthController) PostLogin() {
|
||||||
// Return the JWT token to the client.
|
// Return the JWT token to the client.
|
||||||
err = a.Ctx.JSON(&LoginResponse{Token: token})
|
err = a.Ctx.JSON(&LoginResponse{Token: token})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
shared.GetLogger().Error("failed to generate response", zap.Error(err))
|
logger.Get().Error("failed to generate response", zap.Error(err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -189,7 +189,7 @@ func (a *AuthController) PostPubkeyChallenge() {
|
||||||
|
|
||||||
// Read the login request from the client.
|
// Read the login request from the client.
|
||||||
if err := a.Ctx.ReadJSON(&r); err != nil {
|
if err := a.Ctx.ReadJSON(&r); err != nil {
|
||||||
shared.GetLogger().Debug("failed to parse request", zap.Error(err))
|
logger.Get().Debug("failed to parse request", zap.Error(err))
|
||||||
a.Ctx.StopWithError(iris.StatusBadRequest, err)
|
a.Ctx.StopWithError(iris.StatusBadRequest, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -221,7 +221,7 @@ func (a *AuthController) PostPubkeyLogin() {
|
||||||
|
|
||||||
// Read the key login request from the client.
|
// Read the key login request from the client.
|
||||||
if err := a.Ctx.ReadJSON(&r); err != nil {
|
if err := a.Ctx.ReadJSON(&r); err != nil {
|
||||||
shared.GetLogger().Debug("failed to parse request", zap.Error(err))
|
logger.Get().Debug("failed to parse request", zap.Error(err))
|
||||||
a.Ctx.StopWithError(iris.StatusBadRequest, err)
|
a.Ctx.StopWithError(iris.StatusBadRequest, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -230,7 +230,7 @@ func (a *AuthController) PostPubkeyLogin() {
|
||||||
challenge := model.KeyChallenge{}
|
challenge := model.KeyChallenge{}
|
||||||
if err := db.Get().Where("challenge = ?", r.Challenge).Preload("Key").First(&challenge).Error; err != nil {
|
if err := db.Get().Where("challenge = ?", r.Challenge).Preload("Key").First(&challenge).Error; err != nil {
|
||||||
msg := "invalid key challenge"
|
msg := "invalid key challenge"
|
||||||
shared.GetLogger().Debug(msg, zap.Error(err), zap.String("challenge", r.Challenge))
|
logger.Get().Debug(msg, zap.Error(err), zap.String("challenge", r.Challenge))
|
||||||
a.Ctx.StopWithError(iris.StatusBadRequest, errorx.RejectedOperation.New(msg))
|
a.Ctx.StopWithError(iris.StatusBadRequest, errorx.RejectedOperation.New(msg))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -238,7 +238,7 @@ func (a *AuthController) PostPubkeyLogin() {
|
||||||
verifiedToken, err := jwt.Verify(jwt.HS256, sharedKey, []byte(r.Challenge), blocklist)
|
verifiedToken, err := jwt.Verify(jwt.HS256, sharedKey, []byte(r.Challenge), blocklist)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
msg := "invalid key challenge"
|
msg := "invalid key challenge"
|
||||||
shared.GetLogger().Debug(msg, zap.Error(err), zap.String("challenge", r.Challenge))
|
logger.Get().Debug(msg, zap.Error(err), zap.String("challenge", r.Challenge))
|
||||||
a.Ctx.StopWithError(iris.StatusBadRequest, errorx.RejectedOperation.New(msg))
|
a.Ctx.StopWithError(iris.StatusBadRequest, errorx.RejectedOperation.New(msg))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -246,7 +246,7 @@ func (a *AuthController) PostPubkeyLogin() {
|
||||||
rawPubKey, err := hex.DecodeString(r.Pubkey)
|
rawPubKey, err := hex.DecodeString(r.Pubkey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
msg := "invalid pubkey"
|
msg := "invalid pubkey"
|
||||||
shared.GetLogger().Debug(msg, zap.Error(err), zap.String("pubkey", r.Pubkey))
|
logger.Get().Debug(msg, zap.Error(err), zap.String("pubkey", r.Pubkey))
|
||||||
a.Ctx.StopWithError(iris.StatusBadRequest, errorx.RejectedOperation.New(msg))
|
a.Ctx.StopWithError(iris.StatusBadRequest, errorx.RejectedOperation.New(msg))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -254,7 +254,7 @@ func (a *AuthController) PostPubkeyLogin() {
|
||||||
rawSignature, err := hex.DecodeString(r.Signature)
|
rawSignature, err := hex.DecodeString(r.Signature)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
msg := "invalid signature"
|
msg := "invalid signature"
|
||||||
shared.GetLogger().Debug(msg, zap.Error(err), zap.String("signature", r.Signature))
|
logger.Get().Debug(msg, zap.Error(err), zap.String("signature", r.Signature))
|
||||||
a.Ctx.StopWithError(iris.StatusBadRequest, errorx.RejectedOperation.New(msg))
|
a.Ctx.StopWithError(iris.StatusBadRequest, errorx.RejectedOperation.New(msg))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -264,7 +264,7 @@ func (a *AuthController) PostPubkeyLogin() {
|
||||||
// Verify the challenge signature.
|
// Verify the challenge signature.
|
||||||
if !ed25519.Verify(publicKeyDecoded, []byte(r.Challenge), rawSignature) {
|
if !ed25519.Verify(publicKeyDecoded, []byte(r.Challenge), rawSignature) {
|
||||||
msg := "invalid challenge"
|
msg := "invalid challenge"
|
||||||
shared.GetLogger().Debug(msg, zap.Error(err), zap.String("challenge", r.Challenge))
|
logger.Get().Debug(msg, zap.Error(err), zap.String("challenge", r.Challenge))
|
||||||
a.Ctx.StopWithError(iris.StatusBadRequest, errorx.RejectedOperation.New(msg))
|
a.Ctx.StopWithError(iris.StatusBadRequest, errorx.RejectedOperation.New(msg))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -278,14 +278,14 @@ func (a *AuthController) PostPubkeyLogin() {
|
||||||
err = blocklist.InvalidateToken(verifiedToken.Token, verifiedToken.StandardClaims)
|
err = blocklist.InvalidateToken(verifiedToken.Token, verifiedToken.StandardClaims)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
msg := "failed to invalidate token"
|
msg := "failed to invalidate token"
|
||||||
shared.GetLogger().Error(msg, zap.Error(err), zap.String("token", hex.EncodeToString(verifiedToken.Token)))
|
logger.Get().Error(msg, zap.Error(err), zap.String("token", hex.EncodeToString(verifiedToken.Token)))
|
||||||
a.Ctx.StopWithError(iris.StatusInternalServerError, errorx.RejectedOperation.Wrap(err, msg))
|
a.Ctx.StopWithError(iris.StatusInternalServerError, errorx.RejectedOperation.Wrap(err, msg))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := db.Get().Delete(&challenge).Error; err != nil {
|
if err := db.Get().Delete(&challenge).Error; err != nil {
|
||||||
msg := "failed to delete key challenge"
|
msg := "failed to delete key challenge"
|
||||||
shared.GetLogger().Error(msg, zap.Error(err), zap.Any("key_challenge", challenge))
|
logger.Get().Error(msg, zap.Error(err), zap.Any("key_challenge", challenge))
|
||||||
a.Ctx.StopWithError(iris.StatusBadRequest, errorx.RejectedOperation.New(msg))
|
a.Ctx.StopWithError(iris.StatusBadRequest, errorx.RejectedOperation.New(msg))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -293,7 +293,7 @@ func (a *AuthController) PostPubkeyLogin() {
|
||||||
// Return the JWT token to the client.
|
// Return the JWT token to the client.
|
||||||
err = a.Ctx.JSON(&LoginResponse{Token: token})
|
err = a.Ctx.JSON(&LoginResponse{Token: token})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
shared.GetLogger().Error("failed to create response", zap.Error(err))
|
logger.Get().Error("failed to create response", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -304,7 +304,7 @@ func (a *AuthController) PostLogout() {
|
||||||
|
|
||||||
// Read the logout request from the client.
|
// Read the logout request from the client.
|
||||||
if err := a.Ctx.ReadJSON(&r); err != nil {
|
if err := a.Ctx.ReadJSON(&r); err != nil {
|
||||||
shared.GetLogger().Debug("failed to parse request", zap.Error(err))
|
logger.Get().Debug("failed to parse request", zap.Error(err))
|
||||||
a.Ctx.StopWithError(iris.StatusBadRequest, err)
|
a.Ctx.StopWithError(iris.StatusBadRequest, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -313,7 +313,7 @@ func (a *AuthController) PostLogout() {
|
||||||
claims, err := jwt.Verify(jwt.HS256, sharedKey, []byte(r.Token), blocklist)
|
claims, err := jwt.Verify(jwt.HS256, sharedKey, []byte(r.Token), blocklist)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
msg := "invalid token"
|
msg := "invalid token"
|
||||||
shared.GetLogger().Debug(msg, zap.Error(err))
|
logger.Get().Debug(msg, zap.Error(err))
|
||||||
a.Ctx.StopWithError(iris.StatusBadRequest, errors.New(msg))
|
a.Ctx.StopWithError(iris.StatusBadRequest, errors.New(msg))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -321,7 +321,7 @@ func (a *AuthController) PostLogout() {
|
||||||
err = blocklist.InvalidateToken(claims.Token, claims.StandardClaims)
|
err = blocklist.InvalidateToken(claims.Token, claims.StandardClaims)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
msg := "failed to invalidate token"
|
msg := "failed to invalidate token"
|
||||||
shared.GetLogger().Error(msg, zap.Error(err), zap.String("token", hex.EncodeToString(claims.Token)))
|
logger.Get().Error(msg, zap.Error(err), zap.String("token", hex.EncodeToString(claims.Token)))
|
||||||
a.Ctx.StopWithError(iris.StatusBadRequest, errors.New(msg))
|
a.Ctx.StopWithError(iris.StatusBadRequest, errors.New(msg))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,8 +3,8 @@ package controller
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"git.lumeweb.com/LumeWeb/portal/cid"
|
"git.lumeweb.com/LumeWeb/portal/cid"
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/logger"
|
||||||
"git.lumeweb.com/LumeWeb/portal/service/files"
|
"git.lumeweb.com/LumeWeb/portal/service/files"
|
||||||
"git.lumeweb.com/LumeWeb/portal/shared"
|
|
||||||
"github.com/kataras/iris/v12"
|
"github.com/kataras/iris/v12"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"io"
|
"io"
|
||||||
|
@ -22,28 +22,28 @@ func (f *FilesController) PostUpload() {
|
||||||
|
|
||||||
file, meta, err := f.Ctx.FormFile("file")
|
file, meta, err := f.Ctx.FormFile("file")
|
||||||
if internalErrorCustom(ctx, err, errors.New("invalid file data")) {
|
if internalErrorCustom(ctx, err, errors.New("invalid file data")) {
|
||||||
shared.GetLogger().Debug("invalid file data", zap.Error(err))
|
logger.Get().Debug("invalid file data", zap.Error(err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
upload, err := files.Upload(file, meta.Size, nil)
|
upload, err := files.Upload(file, meta.Size, nil)
|
||||||
|
|
||||||
if internalError(ctx, err) {
|
if internalError(ctx, err) {
|
||||||
shared.GetLogger().Debug("failed uploading file", zap.Error(err))
|
logger.Get().Debug("failed uploading file", zap.Error(err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
cidString, err := cid.EncodeString(upload.Hash, uint64(meta.Size))
|
cidString, err := cid.EncodeString(upload.Hash, uint64(meta.Size))
|
||||||
|
|
||||||
if internalError(ctx, err) {
|
if internalError(ctx, err) {
|
||||||
shared.GetLogger().Debug("failed creating cid", zap.Error(err))
|
logger.Get().Debug("failed creating cid", zap.Error(err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err = ctx.JSON(&UploadResponse{Cid: cidString})
|
err = ctx.JSON(&UploadResponse{Cid: cidString})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
shared.GetLogger().Error("failed to create response", zap.Error(err))
|
logger.Get().Error("failed to create response", zap.Error(err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -52,7 +52,7 @@ func (f *FilesController) GetDownloadBy(cidString string) {
|
||||||
|
|
||||||
_, err := cid.Valid(cidString)
|
_, err := cid.Valid(cidString)
|
||||||
if sendError(ctx, err, iris.StatusBadRequest) {
|
if sendError(ctx, err, iris.StatusBadRequest) {
|
||||||
shared.GetLogger().Debug("invalid cid", zap.Error(err))
|
logger.Get().Debug("invalid cid", zap.Error(err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -60,7 +60,7 @@ func (f *FilesController) GetDownloadBy(cidString string) {
|
||||||
hashHex := cidObject.StringHash()
|
hashHex := cidObject.StringHash()
|
||||||
download, err := files.Download(hashHex)
|
download, err := files.Download(hashHex)
|
||||||
if internalError(ctx, err) {
|
if internalError(ctx, err) {
|
||||||
shared.GetLogger().Debug("failed fetching file", zap.Error(err))
|
logger.Get().Debug("failed fetching file", zap.Error(err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -70,7 +70,7 @@ func (f *FilesController) GetDownloadBy(cidString string) {
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
if internalError(ctx, err) {
|
if internalError(ctx, err) {
|
||||||
shared.GetLogger().Debug("failed streaming file", zap.Error(err))
|
logger.Get().Debug("failed streaming file", zap.Error(err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,22 @@
|
||||||
|
package logger
|
||||||
|
|
||||||
|
import (
|
||||||
|
"go.uber.org/zap"
|
||||||
|
"log"
|
||||||
|
)
|
||||||
|
|
||||||
|
var logger *zap.Logger
|
||||||
|
|
||||||
|
func Init() {
|
||||||
|
newLogger, err := zap.NewProduction()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
logger = newLogger
|
||||||
|
}
|
||||||
|
|
||||||
|
func Get() *zap.Logger {
|
||||||
|
return logger
|
||||||
|
}
|
6
main.go
6
main.go
|
@ -6,9 +6,9 @@ import (
|
||||||
"git.lumeweb.com/LumeWeb/portal/controller"
|
"git.lumeweb.com/LumeWeb/portal/controller"
|
||||||
"git.lumeweb.com/LumeWeb/portal/db"
|
"git.lumeweb.com/LumeWeb/portal/db"
|
||||||
_ "git.lumeweb.com/LumeWeb/portal/docs"
|
_ "git.lumeweb.com/LumeWeb/portal/docs"
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/logger"
|
||||||
"git.lumeweb.com/LumeWeb/portal/renterd"
|
"git.lumeweb.com/LumeWeb/portal/renterd"
|
||||||
"git.lumeweb.com/LumeWeb/portal/service/files"
|
"git.lumeweb.com/LumeWeb/portal/service/files"
|
||||||
"git.lumeweb.com/LumeWeb/portal/shared"
|
|
||||||
"git.lumeweb.com/LumeWeb/portal/tus"
|
"git.lumeweb.com/LumeWeb/portal/tus"
|
||||||
"git.lumeweb.com/LumeWeb/portal/validator"
|
"git.lumeweb.com/LumeWeb/portal/validator"
|
||||||
"github.com/iris-contrib/swagger"
|
"github.com/iris-contrib/swagger"
|
||||||
|
@ -50,7 +50,7 @@ func main() {
|
||||||
|
|
||||||
renterd.Ready()
|
renterd.Ready()
|
||||||
|
|
||||||
shared.Init()
|
logger.Init()
|
||||||
files.Init()
|
files.Init()
|
||||||
|
|
||||||
// Create a new Iris app instance
|
// Create a new Iris app instance
|
||||||
|
@ -109,7 +109,7 @@ func main() {
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
shared.GetLogger().Error("Failed starting webserver proof", zap.Error(err))
|
logger.Get().Error("Failed starting webserver proof", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
renterd.ShutdownComplete()
|
renterd.ShutdownComplete()
|
||||||
|
|
|
@ -9,4 +9,5 @@ type Tus struct {
|
||||||
ID uint64 `gorm:"primaryKey"`
|
ID uint64 `gorm:"primaryKey"`
|
||||||
UploadID string
|
UploadID string
|
||||||
Hash string
|
Hash string
|
||||||
|
Info string
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,10 +8,13 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"git.lumeweb.com/LumeWeb/portal/bao"
|
"git.lumeweb.com/LumeWeb/portal/bao"
|
||||||
"git.lumeweb.com/LumeWeb/portal/db"
|
"git.lumeweb.com/LumeWeb/portal/db"
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/logger"
|
||||||
"git.lumeweb.com/LumeWeb/portal/model"
|
"git.lumeweb.com/LumeWeb/portal/model"
|
||||||
"git.lumeweb.com/LumeWeb/portal/renterd"
|
"git.lumeweb.com/LumeWeb/portal/renterd"
|
||||||
"git.lumeweb.com/LumeWeb/portal/shared"
|
"git.lumeweb.com/LumeWeb/portal/shared"
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/tusstore"
|
||||||
"github.com/go-resty/resty/v2"
|
"github.com/go-resty/resty/v2"
|
||||||
|
_ "github.com/tus/tusd/pkg/handler"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"io"
|
"io"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -32,13 +35,13 @@ func Upload(r io.ReadSeeker, size int64, hash []byte) (model.Upload, error) {
|
||||||
tree, hashBytes, err := bao.ComputeTree(r, size)
|
tree, hashBytes, err := bao.ComputeTree(r, size)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
shared.GetLogger().Error("Failed to hash file", zap.Error(err))
|
logger.Get().Error("Failed to hash file", zap.Error(err))
|
||||||
return upload, err
|
return upload, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if hash != nil {
|
if hash != nil {
|
||||||
if bytes.Compare(hashBytes[:], hash) != 0 {
|
if bytes.Compare(hashBytes[:], hash) != 0 {
|
||||||
shared.GetLogger().Error("File hash does not match provided file hash")
|
logger.Get().Error("File hash does not match provided file hash")
|
||||||
return upload, err
|
return upload, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -55,20 +58,20 @@ func Upload(r io.ReadSeeker, size int64, hash []byte) (model.Upload, error) {
|
||||||
if (result.Error != nil && result.Error.Error() != "record not found") || result.RowsAffected > 0 {
|
if (result.Error != nil && result.Error.Error() != "record not found") || result.RowsAffected > 0 {
|
||||||
err := result.Row().Scan(&upload)
|
err := result.Row().Scan(&upload)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
shared.GetLogger().Error("Failed to query uploads table", zap.Error(err))
|
logger.Get().Error("Failed to query uploads table", zap.Error(err))
|
||||||
return upload, err
|
return upload, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if result.RowsAffected > 0 && upload.ID > 0 {
|
if result.RowsAffected > 0 && upload.ID > 0 {
|
||||||
shared.GetLogger().Info("Upload already exists")
|
logger.Get().Info("Upload already exists")
|
||||||
return upload, nil
|
return upload, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
objectExistsResult, err := client.R().Get(fmt.Sprintf("/worker/objects/%s", hashHex))
|
objectExistsResult, err := client.R().Get(getBusObjectUrl(hashHex))
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
shared.GetLogger().Error("Failed query object", zap.Error(err))
|
logger.Get().Error("Failed query object", zap.Error(err))
|
||||||
return upload, err
|
return upload, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -77,17 +80,17 @@ func Upload(r io.ReadSeeker, size int64, hash []byte) (model.Upload, error) {
|
||||||
if objectStatusCode == 500 {
|
if objectStatusCode == 500 {
|
||||||
bodyErr := objectExistsResult.String()
|
bodyErr := objectExistsResult.String()
|
||||||
if !strings.Contains(bodyErr, "no slabs found") {
|
if !strings.Contains(bodyErr, "no slabs found") {
|
||||||
shared.GetLogger().Error("Failed fetching object", zap.String("error", objectExistsResult.String()))
|
logger.Get().Error("Failed fetching object", zap.String("error", objectExistsResult.String()))
|
||||||
return upload, errors.New(fmt.Sprintf("error fetching file: %s", objectExistsResult.String()))
|
return upload, errors.New(fmt.Sprintf("error fetching file: %s", objectExistsResult.String()))
|
||||||
}
|
}
|
||||||
|
|
||||||
objectStatusCode = 404
|
objectStatusCode = 404
|
||||||
}
|
}
|
||||||
|
|
||||||
proofExistsResult, err := client.R().Get(fmt.Sprintf("/worker/objects/%s.obao", hashHex))
|
proofExistsResult, err := client.R().Get(getBusProofUrl(hashHex))
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
shared.GetLogger().Error("Failed query object proof", zap.Error(err))
|
logger.Get().Error("Failed query object proof", zap.Error(err))
|
||||||
return upload, err
|
return upload, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -96,7 +99,7 @@ func Upload(r io.ReadSeeker, size int64, hash []byte) (model.Upload, error) {
|
||||||
if proofStatusCode == 500 {
|
if proofStatusCode == 500 {
|
||||||
bodyErr := proofExistsResult.String()
|
bodyErr := proofExistsResult.String()
|
||||||
if !strings.Contains(bodyErr, "no slabs found") {
|
if !strings.Contains(bodyErr, "no slabs found") {
|
||||||
shared.GetLogger().Error("Failed fetching object proof", zap.String("error", proofExistsResult.String()))
|
logger.Get().Error("Failed fetching object proof", zap.String("error", proofExistsResult.String()))
|
||||||
return upload, errors.New(fmt.Sprintf("error fetching file proof: %s", proofExistsResult.String()))
|
return upload, errors.New(fmt.Sprintf("error fetching file proof: %s", proofExistsResult.String()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -105,20 +108,20 @@ func Upload(r io.ReadSeeker, size int64, hash []byte) (model.Upload, error) {
|
||||||
|
|
||||||
if objectStatusCode != 404 && proofStatusCode != 404 {
|
if objectStatusCode != 404 && proofStatusCode != 404 {
|
||||||
msg := "file already exists in network, but missing in database"
|
msg := "file already exists in network, but missing in database"
|
||||||
shared.GetLogger().Error(msg)
|
logger.Get().Error(msg)
|
||||||
return upload, errors.New(msg)
|
return upload, errors.New(msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
ret, err := client.R().SetBody(r).Put(fmt.Sprintf("/worker/objects/%s", hashHex))
|
ret, err := client.R().SetBody(r).Put(getWorkerObjectUrl(hashHex))
|
||||||
if ret.StatusCode() != 200 {
|
if ret.StatusCode() != 200 {
|
||||||
shared.GetLogger().Error("Failed uploading object", zap.String("error", ret.String()))
|
logger.Get().Error("Failed uploading object", zap.String("error", ret.String()))
|
||||||
err = errors.New(ret.String())
|
err = errors.New(ret.String())
|
||||||
return upload, err
|
return upload, err
|
||||||
}
|
}
|
||||||
|
|
||||||
ret, err = client.R().SetBody(tree).Put(fmt.Sprintf("/worker/objects/%s.obao", hashHex))
|
ret, err = client.R().SetBody(tree).Put(getWorkerProofUrl(hashHex))
|
||||||
if ret.StatusCode() != 200 {
|
if ret.StatusCode() != 200 {
|
||||||
shared.GetLogger().Error("Failed uploading proof", zap.String("error", ret.String()))
|
logger.Get().Error("Failed uploading proof", zap.String("error", ret.String()))
|
||||||
err = errors.New(ret.String())
|
err = errors.New(ret.String())
|
||||||
return upload, err
|
return upload, err
|
||||||
}
|
}
|
||||||
|
@ -128,7 +131,7 @@ func Upload(r io.ReadSeeker, size int64, hash []byte) (model.Upload, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = db.Get().Create(&upload).Error; err != nil {
|
if err = db.Get().Create(&upload).Error; err != nil {
|
||||||
shared.GetLogger().Error("Failed adding upload to db", zap.Error(err))
|
logger.Get().Error("Failed adding upload to db", zap.Error(err))
|
||||||
return upload, err
|
return upload, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -141,7 +144,7 @@ func Download(hash string) (io.Reader, error) {
|
||||||
if uploadItem.Err() == nil {
|
if uploadItem.Err() == nil {
|
||||||
fetch, err := client.R().SetDoNotParseResponse(true).Get(fmt.Sprintf("/worker/objects/%s", hash))
|
fetch, err := client.R().SetDoNotParseResponse(true).Get(fmt.Sprintf("/worker/objects/%s", hash))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
shared.GetLogger().Error("Failed downloading object", zap.Error(err))
|
logger.Get().Error("Failed downloading object", zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -150,25 +153,64 @@ func Download(hash string) (io.Reader, error) {
|
||||||
var tusData model.Tus
|
var tusData model.Tus
|
||||||
err := tusItem.Scan(&tusData)
|
err := tusItem.Scan(&tusData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
shared.GetLogger().Error("Failed querying upload from db", zap.Error(err))
|
logger.Get().Error("Failed querying upload from db", zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
upload, err := shared.GetTusStore().GetUpload(context.Background(), tusData.UploadID)
|
upload, err := getStore().GetUpload(context.Background(), tusData.UploadID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
shared.GetLogger().Error("Failed querying tus upload", zap.Error(err))
|
logger.Get().Error("Failed querying tus upload", zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
reader, err := upload.GetReader(context.Background())
|
reader, err := upload.GetReader(context.Background())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
shared.GetLogger().Error("Failed reading tus upload", zap.Error(err))
|
logger.Get().Error("Failed reading tus upload", zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return reader, nil
|
return reader, nil
|
||||||
} else {
|
} else {
|
||||||
shared.GetLogger().Error("invalid file")
|
logger.Get().Error("invalid file")
|
||||||
return nil, errors.New("invalid file")
|
return nil, errors.New("invalid file")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func objectUrlBuilder(hash string, bus bool, proof bool) string {
|
||||||
|
path := []string{}
|
||||||
|
if bus {
|
||||||
|
path = append(path, "bus")
|
||||||
|
} else {
|
||||||
|
path = append(path, "worker")
|
||||||
|
}
|
||||||
|
|
||||||
|
path = append(path, "objects")
|
||||||
|
|
||||||
|
name := "%s"
|
||||||
|
|
||||||
|
if proof {
|
||||||
|
name = name + ".obao"
|
||||||
|
}
|
||||||
|
|
||||||
|
path = append(path, name)
|
||||||
|
|
||||||
|
return fmt.Sprintf(strings.Join(path, "/"), hash)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getBusObjectUrl(hash string) string {
|
||||||
|
return objectUrlBuilder(hash, true, false)
|
||||||
|
}
|
||||||
|
func getWorkerObjectUrl(hash string) string {
|
||||||
|
return objectUrlBuilder(hash, false, false)
|
||||||
|
}
|
||||||
|
func getWorkerProofUrl(hash string) string {
|
||||||
|
return objectUrlBuilder(hash, false, true)
|
||||||
|
}
|
||||||
|
func getBusProofUrl(hash string) string {
|
||||||
|
return objectUrlBuilder(hash, true, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getStore() *tusstore.DbFileStore {
|
||||||
|
ret := shared.GetTusStore()
|
||||||
|
return (*ret).(*tusstore.DbFileStore)
|
||||||
|
}
|
||||||
|
|
|
@ -1,53 +1,45 @@
|
||||||
package shared
|
package shared
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/golang-queue/queue"
|
|
||||||
"github.com/tus/tusd/pkg/filestore"
|
|
||||||
tusd "github.com/tus/tusd/pkg/handler"
|
tusd "github.com/tus/tusd/pkg/handler"
|
||||||
"go.uber.org/zap"
|
|
||||||
_ "go.uber.org/zap"
|
_ "go.uber.org/zap"
|
||||||
"log"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var tusQueue *queue.Queue
|
type TusFunc func(upload *tusd.Upload) error
|
||||||
var tusStore *filestore.FileStore
|
|
||||||
var tusComposer *tusd.StoreComposer
|
|
||||||
var logger *zap.Logger
|
|
||||||
|
|
||||||
func SetTusQueue(q *queue.Queue) {
|
var tusQueue *interface{}
|
||||||
tusQueue = q
|
var tusStore *interface{}
|
||||||
|
var tusComposer *interface{}
|
||||||
|
var tusWorker TusFunc
|
||||||
|
|
||||||
|
func SetTusQueue(q interface{}) {
|
||||||
|
tusQueue = &q
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetTusQueue() *queue.Queue {
|
func GetTusQueue() *interface{} {
|
||||||
return tusQueue
|
return tusQueue
|
||||||
}
|
}
|
||||||
|
|
||||||
func SetTusStore(s *filestore.FileStore) {
|
func SetTusStore(s interface{}) {
|
||||||
tusStore = s
|
tusStore = &s
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetTusStore() *filestore.FileStore {
|
func GetTusStore() *interface{} {
|
||||||
return tusStore
|
return tusStore
|
||||||
}
|
}
|
||||||
|
|
||||||
func SetTusComposer(c *tusd.StoreComposer) {
|
func SetTusComposer(c interface{}) {
|
||||||
tusComposer = c
|
tusComposer = &c
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetTusComposer() *tusd.StoreComposer {
|
func GetTusComposer() *interface{} {
|
||||||
return tusComposer
|
return tusComposer
|
||||||
}
|
}
|
||||||
|
|
||||||
func Init() {
|
func SetTusWorker(w TusFunc) {
|
||||||
newLogger, err := zap.NewProduction()
|
tusWorker = w
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
logger = newLogger
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetLogger() *zap.Logger {
|
func GetTusWorker() TusFunc {
|
||||||
return logger
|
return tusWorker
|
||||||
}
|
}
|
||||||
|
|
86
tus/tus.go
86
tus/tus.go
|
@ -7,11 +7,12 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"git.lumeweb.com/LumeWeb/portal/cid"
|
"git.lumeweb.com/LumeWeb/portal/cid"
|
||||||
"git.lumeweb.com/LumeWeb/portal/db"
|
"git.lumeweb.com/LumeWeb/portal/db"
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/logger"
|
||||||
"git.lumeweb.com/LumeWeb/portal/model"
|
"git.lumeweb.com/LumeWeb/portal/model"
|
||||||
"git.lumeweb.com/LumeWeb/portal/service/files"
|
"git.lumeweb.com/LumeWeb/portal/service/files"
|
||||||
"git.lumeweb.com/LumeWeb/portal/shared"
|
"git.lumeweb.com/LumeWeb/portal/shared"
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/tusstore"
|
||||||
"github.com/golang-queue/queue"
|
"github.com/golang-queue/queue"
|
||||||
"github.com/tus/tusd/pkg/filestore"
|
|
||||||
tusd "github.com/tus/tusd/pkg/handler"
|
tusd "github.com/tus/tusd/pkg/handler"
|
||||||
"github.com/tus/tusd/pkg/memorylocker"
|
"github.com/tus/tusd/pkg/memorylocker"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -24,7 +25,7 @@ const TUS_API_PATH = "/files/tus"
|
||||||
const HASH_META_HEADER = "blake3-hash"
|
const HASH_META_HEADER = "blake3-hash"
|
||||||
|
|
||||||
func Init() *tusd.Handler {
|
func Init() *tusd.Handler {
|
||||||
store := &filestore.FileStore{
|
store := &tusstore.DbFileStore{
|
||||||
Path: "/tmp",
|
Path: "/tmp",
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -45,7 +46,7 @@ func Init() *tusd.Handler {
|
||||||
|
|
||||||
if len(hash) == 0 {
|
if len(hash) == 0 {
|
||||||
msg := "missing blake3-hash metadata"
|
msg := "missing blake3-hash metadata"
|
||||||
shared.GetLogger().Debug(msg)
|
logger.Get().Debug(msg)
|
||||||
return errors.New(msg)
|
return errors.New(msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -54,52 +55,27 @@ func Init() *tusd.Handler {
|
||||||
if (result.Error != nil && result.Error.Error() != "record not found") || result.RowsAffected > 0 {
|
if (result.Error != nil && result.Error.Error() != "record not found") || result.RowsAffected > 0 {
|
||||||
hashBytes, err := hex.DecodeString(hash)
|
hashBytes, err := hex.DecodeString(hash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
shared.GetLogger().Debug("invalid hash", zap.Error(err))
|
logger.Get().Debug("invalid hash", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
cidString, err := cid.Encode(hashBytes, uint64(hook.Upload.Size))
|
cidString, err := cid.Encode(hashBytes, uint64(hook.Upload.Size))
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
shared.GetLogger().Debug("failed to create cid", zap.Error(err))
|
logger.Get().Debug("failed to create cid", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := json.Marshal(UploadResponse{Cid: cidString})
|
resp, err := json.Marshal(UploadResponse{Cid: cidString})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
shared.GetLogger().Error("failed to create response", zap.Error(err))
|
logger.Get().Error("failed to create response", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return tusd.NewHTTPError(errors.New(string(resp)), 304)
|
return tusd.NewHTTPError(errors.New(string(resp)), 304)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
PreFinishResponseCallback: func(hook tusd.HookEvent) error {
|
|
||||||
tusEntry := &model.Tus{
|
|
||||||
UploadID: hook.Upload.ID,
|
|
||||||
Hash: hook.Upload.MetaData[HASH_META_HEADER],
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := db.Get().Create(tusEntry).Error; err != nil {
|
|
||||||
shared.GetLogger().Error("failed to create tus entry", zap.Error(err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := shared.GetTusQueue().QueueTask(func(ctx context.Context) error {
|
|
||||||
upload, err := store.GetUpload(nil, hook.Upload.ID)
|
|
||||||
if err != nil {
|
|
||||||
shared.GetLogger().Error("failed to query tus upload", zap.Error(err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return tusWorker(&upload)
|
|
||||||
}); err != nil {
|
|
||||||
shared.GetLogger().Error("failed to queue tus upload", zap.Error(err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
@ -107,7 +83,10 @@ func Init() *tusd.Handler {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
shared.SetTusQueue(queue.NewPool(5))
|
pool := queue.NewPool(5)
|
||||||
|
|
||||||
|
shared.SetTusQueue(pool)
|
||||||
|
shared.SetTusWorker(tusWorker)
|
||||||
|
|
||||||
go tusStartup()
|
go tusStartup()
|
||||||
|
|
||||||
|
@ -118,14 +97,14 @@ func tusStartup() {
|
||||||
result := map[int]model.Tus{}
|
result := map[int]model.Tus{}
|
||||||
db.Get().Table("tus").Take(&result)
|
db.Get().Table("tus").Take(&result)
|
||||||
|
|
||||||
tusQueue := shared.GetTusQueue()
|
tusQueue := getQueue()
|
||||||
store := shared.GetTusStore()
|
store := getStore()
|
||||||
|
|
||||||
for _, item := range result {
|
for _, item := range result {
|
||||||
if err := tusQueue.QueueTask(func(ctx context.Context) error {
|
if err := tusQueue.QueueTask(func(ctx context.Context) error {
|
||||||
upload, err := store.GetUpload(nil, item.UploadID)
|
upload, err := store.GetUpload(nil, item.UploadID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
shared.GetLogger().Error("failed to query tus upload", zap.Error(err))
|
logger.Get().Error("failed to query tus upload", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return tusWorker(&upload)
|
return tusWorker(&upload)
|
||||||
|
@ -138,12 +117,12 @@ func tusStartup() {
|
||||||
func tusWorker(upload *tusd.Upload) error {
|
func tusWorker(upload *tusd.Upload) error {
|
||||||
info, err := (*upload).GetInfo(context.Background())
|
info, err := (*upload).GetInfo(context.Background())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
shared.GetLogger().Error("failed to query tus upload metadata", zap.Error(err))
|
logger.Get().Error("failed to query tus upload metadata", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
file, err := (*upload).GetReader(context.Background())
|
file, err := (*upload).GetReader(context.Background())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
shared.GetLogger().Error("failed reading upload", zap.Error(err))
|
logger.Get().Error("failed reading upload", zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -152,7 +131,7 @@ func tusWorker(upload *tusd.Upload) error {
|
||||||
hashBytes, err := hex.DecodeString(hashHex)
|
hashBytes, err := hex.DecodeString(hashHex)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
shared.GetLogger().Error("failed decoding hash", zap.Error(err))
|
logger.Get().Error("failed decoding hash", zap.Error(err))
|
||||||
tErr := terminateUpload(*upload)
|
tErr := terminateUpload(*upload)
|
||||||
|
|
||||||
if tErr != nil {
|
if tErr != nil {
|
||||||
|
@ -176,25 +155,12 @@ func tusWorker(upload *tusd.Upload) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func terminateUpload(upload tusd.Upload) error {
|
func terminateUpload(upload tusd.Upload) error {
|
||||||
info, _ := upload.GetInfo(context.Background())
|
err := getComposer().Terminater.AsTerminatableUpload(upload).Terminate(context.Background())
|
||||||
err := shared.GetTusComposer().Terminater.AsTerminatableUpload(upload).Terminate(context.Background())
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
shared.GetLogger().Error("failed deleting tus upload", zap.Error(err))
|
logger.Get().Error("failed deleting tus upload", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
tusUpload := &model.Tus{UploadID: info.ID}
|
|
||||||
ret := db.Get().Where(tusUpload).First(&tusUpload)
|
|
||||||
|
|
||||||
if ret.Error != nil && ret.Error.Error() != "record not found" {
|
|
||||||
shared.GetLogger().Error("failed fetching tus entry", zap.Error(err))
|
|
||||||
err = ret.Error
|
|
||||||
}
|
|
||||||
|
|
||||||
err1 := db.Get().Where(&tusUpload).Delete(&tusUpload)
|
|
||||||
|
|
||||||
_ = err1
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -205,3 +171,17 @@ func terminateUpload(upload tusd.Upload) error {
|
||||||
type UploadResponse struct {
|
type UploadResponse struct {
|
||||||
Cid string `json:"cid"`
|
Cid string `json:"cid"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getQueue() *queue.Queue {
|
||||||
|
ret := shared.GetTusQueue()
|
||||||
|
return (*ret).(*queue.Queue)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getStore() *tusstore.DbFileStore {
|
||||||
|
ret := shared.GetTusStore()
|
||||||
|
return (*ret).(*tusstore.DbFileStore)
|
||||||
|
}
|
||||||
|
func getComposer() *tusd.StoreComposer {
|
||||||
|
ret := shared.GetTusComposer()
|
||||||
|
return (*ret).(*tusd.StoreComposer)
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,305 @@
|
||||||
|
package tusstore
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/rand"
|
||||||
|
"encoding/hex"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/db"
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/logger"
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/model"
|
||||||
|
"git.lumeweb.com/LumeWeb/portal/shared"
|
||||||
|
"github.com/golang-queue/queue"
|
||||||
|
"github.com/tus/tusd/pkg/handler"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
)
|
||||||
|
|
||||||
|
var defaultFilePerm = os.FileMode(0664)
|
||||||
|
|
||||||
|
type DbFileStore struct {
|
||||||
|
// Relative or absolute path to store files in. DbFileStore does not check
|
||||||
|
// whether the path exists, use os.MkdirAll in this case on your own.
|
||||||
|
Path string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (store DbFileStore) UseIn(composer *handler.StoreComposer) {
|
||||||
|
composer.UseCore(store)
|
||||||
|
composer.UseTerminater(store)
|
||||||
|
composer.UseConcater(store)
|
||||||
|
composer.UseLengthDeferrer(store)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (store DbFileStore) NewUpload(ctx context.Context, info handler.FileInfo) (handler.Upload, error) {
|
||||||
|
if info.ID == "" {
|
||||||
|
info.ID = uid()
|
||||||
|
}
|
||||||
|
binPath := store.binPath(info.ID)
|
||||||
|
info.Storage = map[string]string{
|
||||||
|
"Type": "dbstore",
|
||||||
|
"Path": binPath,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create binary file with no content
|
||||||
|
file, err := os.OpenFile(binPath, os.O_CREATE|os.O_WRONLY, defaultFilePerm)
|
||||||
|
if err != nil {
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
err = fmt.Errorf("upload directory does not exist: %s", store.Path)
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = file.Close()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
upload := &fileUpload{
|
||||||
|
info: info,
|
||||||
|
binPath: binPath,
|
||||||
|
hash: info.MetaData["blake3-hash"],
|
||||||
|
}
|
||||||
|
|
||||||
|
// writeInfo creates the file by itself if necessary
|
||||||
|
err = upload.writeInfo()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return upload, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (store DbFileStore) GetUpload(ctx context.Context, id string) (handler.Upload, error) {
|
||||||
|
info := handler.FileInfo{
|
||||||
|
ID: id,
|
||||||
|
}
|
||||||
|
|
||||||
|
fUpload := &fileUpload{info: info}
|
||||||
|
|
||||||
|
record, is404, err := fUpload.getInfo()
|
||||||
|
if err != nil {
|
||||||
|
if is404 {
|
||||||
|
// Interpret os.ErrNotExist as 404 Not Found
|
||||||
|
err = handler.ErrNotFound
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := json.Unmarshal([]byte(record.Info), &info); err != nil {
|
||||||
|
logger.Get().Error("fail to parse upload meta", zap.Error(err))
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
fUpload.info = info
|
||||||
|
|
||||||
|
fUpload.hash = record.Hash
|
||||||
|
binPath := store.binPath(id)
|
||||||
|
stat, err := os.Stat(binPath)
|
||||||
|
if err != nil {
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
// Interpret os.ErrNotExist as 404 Not Found
|
||||||
|
err = handler.ErrNotFound
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
info.Offset = stat.Size()
|
||||||
|
|
||||||
|
fUpload.binPath = binPath
|
||||||
|
|
||||||
|
return fUpload, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (store DbFileStore) AsTerminatableUpload(upload handler.Upload) handler.TerminatableUpload {
|
||||||
|
return upload.(*fileUpload)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (store DbFileStore) AsLengthDeclarableUpload(upload handler.Upload) handler.LengthDeclarableUpload {
|
||||||
|
return upload.(*fileUpload)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (store DbFileStore) AsConcatableUpload(upload handler.Upload) handler.ConcatableUpload {
|
||||||
|
return upload.(*fileUpload)
|
||||||
|
}
|
||||||
|
|
||||||
|
// binPath returns the path to the file storing the binary data.
|
||||||
|
func (store DbFileStore) binPath(id string) string {
|
||||||
|
return filepath.Join(store.Path, id)
|
||||||
|
}
|
||||||
|
|
||||||
|
type fileUpload struct {
|
||||||
|
// info stores the current information about the upload
|
||||||
|
info handler.FileInfo
|
||||||
|
// binPath is the path to the binary file (which has no extension)
|
||||||
|
binPath string
|
||||||
|
hash string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (upload *fileUpload) GetInfo(ctx context.Context) (handler.FileInfo, error) {
|
||||||
|
return upload.info, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (upload *fileUpload) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) {
|
||||||
|
file, err := os.OpenFile(upload.binPath, os.O_WRONLY|os.O_APPEND, defaultFilePerm)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
defer file.Close()
|
||||||
|
|
||||||
|
n, err := io.Copy(file, src)
|
||||||
|
|
||||||
|
upload.info.Offset += n
|
||||||
|
|
||||||
|
err = upload.writeInfo()
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (upload *fileUpload) GetReader(ctx context.Context) (io.Reader, error) {
|
||||||
|
return os.Open(upload.binPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (upload *fileUpload) Terminate(ctx context.Context) error {
|
||||||
|
tusUpload := &model.Tus{
|
||||||
|
UploadID: upload.info.ID,
|
||||||
|
}
|
||||||
|
|
||||||
|
ret := db.Get().Where(&tusUpload).Delete(&tusUpload)
|
||||||
|
|
||||||
|
if ret.Error != nil {
|
||||||
|
logger.Get().Error("failed to delete tus entry", zap.Error(ret.Error))
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := os.Remove(upload.binPath); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (upload *fileUpload) ConcatUploads(ctx context.Context, uploads []handler.Upload) (err error) {
|
||||||
|
file, err := os.OpenFile(upload.binPath, os.O_WRONLY|os.O_APPEND, defaultFilePerm)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer file.Close()
|
||||||
|
|
||||||
|
for _, partialUpload := range uploads {
|
||||||
|
fileUpload := partialUpload.(*fileUpload)
|
||||||
|
|
||||||
|
src, err := os.Open(fileUpload.binPath)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := io.Copy(file, src); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (upload *fileUpload) DeclareLength(ctx context.Context, length int64) error {
|
||||||
|
upload.info.Size = length
|
||||||
|
upload.info.SizeIsDeferred = false
|
||||||
|
return upload.writeInfo()
|
||||||
|
}
|
||||||
|
|
||||||
|
// writeInfo updates the entire information. Everything will be overwritten.
|
||||||
|
func (upload *fileUpload) writeInfo() error {
|
||||||
|
data, err := json.Marshal(upload.info)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
tusRecord, is404, err := upload.getInfo()
|
||||||
|
|
||||||
|
if err != nil && !is404 {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if tusRecord != nil {
|
||||||
|
tusRecord.Info = string(data)
|
||||||
|
if ret := db.Get().Save(&tusRecord); ret.Error != nil {
|
||||||
|
logger.Get().Error("failed to update tus entry", zap.Error(ret.Error))
|
||||||
|
|
||||||
|
return ret.Error
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
tusRecord = &model.Tus{UploadID: upload.info.ID, Hash: upload.hash, Info: string(data)}
|
||||||
|
|
||||||
|
if ret := db.Get().Create(&tusRecord); ret.Error != nil {
|
||||||
|
logger.Get().Error("failed to create tus entry", zap.Error(ret.Error))
|
||||||
|
|
||||||
|
return ret.Error
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (upload *fileUpload) getInfo() (*model.Tus, bool, error) {
|
||||||
|
var tusRecord model.Tus
|
||||||
|
|
||||||
|
result := db.Get().Where(&model.Tus{UploadID: upload.info.ID}).First(&tusRecord)
|
||||||
|
|
||||||
|
if result.Error != nil && result.Error.Error() != "record not found" {
|
||||||
|
logger.Get().Error("failed to query tus entry", zap.Error(result.Error))
|
||||||
|
return nil, false, result.Error
|
||||||
|
}
|
||||||
|
|
||||||
|
if result.Error != nil {
|
||||||
|
return nil, true, result.Error
|
||||||
|
}
|
||||||
|
|
||||||
|
return &tusRecord, false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (upload *fileUpload) FinishUpload(ctx context.Context) error {
|
||||||
|
if err := getQueue().QueueTask(func(ctx context.Context) error {
|
||||||
|
upload, err := getStore().GetUpload(nil, upload.info.ID)
|
||||||
|
if err != nil {
|
||||||
|
logger.Get().Error("failed to query tus upload", zap.Error(err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return shared.GetTusWorker()(&upload)
|
||||||
|
}); err != nil {
|
||||||
|
logger.Get().Error("failed to queue tus upload", zap.Error(err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func uid() string {
|
||||||
|
id := make([]byte, 16)
|
||||||
|
_, err := io.ReadFull(rand.Reader, id)
|
||||||
|
if err != nil {
|
||||||
|
// This is probably an appropriate way to handle errors from our source
|
||||||
|
// for random bits.
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return hex.EncodeToString(id)
|
||||||
|
}
|
||||||
|
func getQueue() *queue.Queue {
|
||||||
|
ret := shared.GetTusQueue()
|
||||||
|
return (*ret).(*queue.Queue)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getStore() *DbFileStore {
|
||||||
|
ret := shared.GetTusStore()
|
||||||
|
return (*ret).(*DbFileStore)
|
||||||
|
}
|
Loading…
Reference in New Issue