portal/api/s5/s5.go

2337 lines
61 KiB
Go

package s5
import (
"bytes"
"context"
"crypto/ed25519"
"crypto/rand"
_ "embed"
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"math"
"mime"
"mime/multipart"
"net/http"
"net/textproto"
"net/url"
"slices"
"strconv"
"strings"
"github.com/gabriel-vasile/mimetype"
_import "git.lumeweb.com/LumeWeb/portal/import"
"git.lumeweb.com/LumeWeb/portal/api/router"
"git.lumeweb.com/LumeWeb/portal/bao"
"git.lumeweb.com/LumeWeb/portal/renter"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"git.lumeweb.com/LumeWeb/portal/cron"
"git.lumeweb.com/LumeWeb/portal/config"
"git.lumeweb.com/LumeWeb/portal/api/swagger"
"git.lumeweb.com/LumeWeb/portal/metadata"
"git.lumeweb.com/LumeWeb/portal/storage"
"git.lumeweb.com/LumeWeb/libs5-go/encoding"
s5libmetadata "git.lumeweb.com/LumeWeb/libs5-go/metadata"
"git.lumeweb.com/LumeWeb/libs5-go/node"
"git.lumeweb.com/LumeWeb/libs5-go/protocol"
"git.lumeweb.com/LumeWeb/libs5-go/service"
storage2 "git.lumeweb.com/LumeWeb/libs5-go/storage"
"git.lumeweb.com/LumeWeb/libs5-go/storage/provider"
"git.lumeweb.com/LumeWeb/libs5-go/types"
"git.lumeweb.com/LumeWeb/portal/db/models"
"github.com/samber/lo"
"github.com/vmihailenco/msgpack/v5"
"go.uber.org/zap"
"gorm.io/gorm"
"nhooyr.io/websocket"
"github.com/julienschmidt/httprouter"
"git.lumeweb.com/LumeWeb/portal/account"
"git.lumeweb.com/LumeWeb/portal/api/middleware"
"git.lumeweb.com/LumeWeb/portal/api/registry"
protoRegistry "git.lumeweb.com/LumeWeb/portal/protocols/registry"
"git.lumeweb.com/LumeWeb/portal/protocols/s5"
"github.com/ddo/rq"
dnslink "github.com/dnslink-std/go"
"github.com/golang-queue/queue"
"github.com/rs/cors"
"go.sia.tech/jape"
"go.uber.org/fx"
)
var (
_ registry.API = (*S5API)(nil)
_ router.RoutableAPI = (*S5API)(nil)
)
//go:embed swagger.yaml
var swagSpec []byte
type S5API struct {
config *config.Manager
identity ed25519.PrivateKey
accounts *account.AccountServiceDefault
storage storage.StorageService
metadata metadata.MetadataService
db *gorm.DB
protocols []protoRegistry.Protocol
protocol *s5.S5Protocol
logger *zap.Logger
tusHandler *s5.TusHandler
cron *cron.CronServiceDefault
_import _import.ImportService
}
type APIParams struct {
fx.In
Config *config.Manager
Identity ed25519.PrivateKey
Accounts *account.AccountServiceDefault
Storage storage.StorageService
Metadata metadata.MetadataService
Db *gorm.DB
Protocols []protoRegistry.Protocol `group:"protocol"`
Logger *zap.Logger
TusHandler *s5.TusHandler
Cron *cron.CronServiceDefault
Import _import.ImportService
}
type S5ApiResult struct {
fx.Out
API registry.API `group:"api"`
S5API *S5API
}
func NewS5(params APIParams) (S5ApiResult, error) {
api := &S5API{
config: params.Config,
identity: params.Identity,
accounts: params.Accounts,
storage: params.Storage,
metadata: params.Metadata,
db: params.Db,
protocols: params.Protocols,
logger: params.Logger,
tusHandler: params.TusHandler,
cron: params.Cron,
_import: params.Import,
}
return S5ApiResult{
API: api,
S5API: api,
}, nil
}
var Module = fx.Module("s5_api",
fx.Provide(NewS5),
)
func (s *S5API) Init() error {
s5protocol := protoRegistry.FindProtocolByName("s5", s.protocols)
if s5protocol == nil {
return fmt.Errorf("s5 protocol not found")
}
s5protocolInstance := s5protocol.(*s5.S5Protocol)
s.protocol = s5protocolInstance
return nil
}
func (s S5API) Name() string {
return "s5"
}
func (s S5API) Start(ctx context.Context) error {
return s.protocol.Node().Start(ctx)
}
func (s S5API) Stop(ctx context.Context) error {
return nil
}
func (s *S5API) Routes() (*httprouter.Router, error) {
authMiddlewareOpts := middleware.AuthMiddlewareOptions{
Identity: s.identity,
Accounts: s.accounts,
Config: s.config,
Purpose: account.JWTPurposeLogin,
}
authMw := authMiddleware(authMiddlewareOpts)
tusHandler := BuildS5TusApi(authMw, s.tusHandler)
corsOptionsHandler := func(c jape.Context) {
c.ResponseWriter.WriteHeader(http.StatusOK)
}
tusCors := BuildTusCors()
wrappedTusHandler := middleware.ApplyMiddlewares(corsOptionsHandler, middleware.ProxyMiddleware, tusCors, authMw)
debugCors := cors.Default()
defaultCors := cors.New(cors.Options{
AllowOriginFunc: func(origin string) bool {
return true
},
AllowedMethods: []string{"POST", "GET", "DELETE"},
AllowedHeaders: []string{"Authorization", "Content-Type"},
AllowCredentials: true,
})
routes := map[string]jape.Handler{
// Account API
"GET /s5/account/register": middleware.ApplyMiddlewares(s.accountRegisterChallenge, middleware.ProxyMiddleware),
"POST /s5/account/register": middleware.ApplyMiddlewares(s.accountRegister, middleware.ProxyMiddleware),
"GET /s5/account/login": middleware.ApplyMiddlewares(s.accountLoginChallenge, middleware.ProxyMiddleware),
"POST /s5/account/login": middleware.ApplyMiddlewares(s.accountLogin, middleware.ProxyMiddleware),
"GET /s5/account": middleware.ApplyMiddlewares(s.accountInfo, middleware.ProxyMiddleware, authMw),
"GET /s5/account/stats": middleware.ApplyMiddlewares(s.accountStats, middleware.ProxyMiddleware, authMw),
"GET /s5/account/pins.bin": middleware.ApplyMiddlewares(s.accountPinsBinary, middleware.ProxyMiddleware, authMw),
"GET /s5/account/pins": middleware.ApplyMiddlewares(s.accountPins, middleware.ProxyMiddleware, defaultCors.Handler, authMw),
"OPTIONS /s5/account/pins": middleware.ApplyMiddlewares(corsOptionsHandler, middleware.ProxyMiddleware, defaultCors.Handler, authMw),
// Upload API
"POST /s5/upload": middleware.ApplyMiddlewares(s.smallFileUpload, middleware.ProxyMiddleware, defaultCors.Handler, authMw),
"POST /s5/upload/directory": middleware.ApplyMiddlewares(s.directoryUpload, middleware.ProxyMiddleware, defaultCors.Handler, authMw),
"OPTIONS /s5/upload": middleware.ApplyMiddlewares(corsOptionsHandler, middleware.ProxyMiddleware, defaultCors.Handler, authMw),
"OPTIONS /s5/upload/directory": middleware.ApplyMiddlewares(corsOptionsHandler, middleware.ProxyMiddleware, defaultCors.Handler, authMw),
// Tus API
"POST /s5/upload/tus": tusHandler,
"OPTIONS /s5/upload/tus": wrappedTusHandler,
"HEAD /s5/upload/tus/:id": tusHandler,
"POST /s5/upload/tus/:id": tusHandler,
"PATCH /s5/upload/tus/:id": tusHandler,
"OPTIONS /s5/upload/tus/:id": wrappedTusHandler,
// Download API
"GET /s5/blob/:cid": middleware.ApplyMiddlewares(s.downloadBlob, middleware.ProxyMiddleware, authMw),
"GET /s5/metadata/:cid": s.downloadMetadata,
"GET /s5/download/:cid": middleware.ApplyMiddlewares(s.downloadFile, middleware.ProxyMiddleware, defaultCors.Handler),
"OPTIONS /s5/blob/:cid": middleware.ApplyMiddlewares(corsOptionsHandler, middleware.ProxyMiddleware, defaultCors.Handler, authMw),
"OPTIONS /s5/metadata/:cid": middleware.ApplyMiddlewares(corsOptionsHandler, middleware.ProxyMiddleware, defaultCors.Handler),
"OPTIONS /s5/download/:cid": middleware.ApplyMiddlewares(corsOptionsHandler, middleware.ProxyMiddleware, defaultCors.Handler),
// Pins API
"POST /s5/pin/:cid": middleware.ApplyMiddlewares(s.accountPin, middleware.ProxyMiddleware, defaultCors.Handler, authMw),
"GET /s5/pin/:cid/status": middleware.ApplyMiddlewares(s.accountPinStatus, middleware.ProxyMiddleware, defaultCors.Handler, authMw),
"DELETE /s5/delete/:cid": middleware.ApplyMiddlewares(s.accountPinDelete, middleware.ProxyMiddleware, defaultCors.Handler, authMw),
"OPTIONS /s5/pin/:cid": middleware.ApplyMiddlewares(corsOptionsHandler, middleware.ProxyMiddleware, defaultCors.Handler, authMw),
"OPTIONS /s5/delete/:cid": middleware.ApplyMiddlewares(corsOptionsHandler, middleware.ProxyMiddleware, defaultCors.Handler, authMw),
"OPTIONS /s5/pin/:cid/status": middleware.ApplyMiddlewares(corsOptionsHandler, middleware.ProxyMiddleware, defaultCors.Handler, authMw),
// Debug API
"GET /s5/debug/download_urls/:cid": middleware.ApplyMiddlewares(s.debugDownloadUrls, middleware.ProxyMiddleware, debugCors.Handler),
"GET /s5/debug/storage_locations/:hash": middleware.ApplyMiddlewares(s.debugStorageLocations, middleware.ProxyMiddleware, debugCors.Handler),
// Registry API
"GET /s5/registry": middleware.ApplyMiddlewares(s.registryQuery, middleware.ProxyMiddleware, authMw),
"POST /s5/registry": middleware.ApplyMiddlewares(s.registrySet, middleware.ProxyMiddleware, authMw),
"GET /s5/registry/subscription": middleware.ApplyMiddlewares(s.registrySubscription, middleware.ProxyMiddleware, authMw),
}
routes, err := swagger.Swagger(swagSpec, routes)
if err != nil {
return nil, err
}
return s.protocol.Node().Services().HTTP().GetHttpRouter(routes), nil
}
func (s *S5API) Can(w http.ResponseWriter, r *http.Request) bool {
host := r.Host
if strings.Contains(host, ":") {
host = strings.Split(host, ":")[0]
}
resolve, err := dnslink.Resolve(host)
if err != nil {
return false
}
if _, ok := resolve.Links[s.Name()]; !ok {
return false
}
decodedCid, err := encoding.CIDFromString(resolve.Links[s.Name()][0].Identifier)
if err != nil {
s.logger.Error("Error decoding CID", zap.Error(err))
return false
}
hash := decodedCid.Hash.HashBytes()
upload, err := s.metadata.GetUpload(r.Context(), hash)
if err != nil {
return false
}
if upload.Protocol != s.Name() {
return false
}
exists, _, err := s.accounts.DNSLinkExists(hash)
if err != nil {
return false
}
if !exists {
return false
}
ctx := context.WithValue(r.Context(), "cid", decodedCid)
*r = *r.WithContext(ctx)
return true
}
func (s *S5API) Handle(w http.ResponseWriter, r *http.Request) {
cidVal := r.Context().Value("cid")
if cidVal == nil {
w.WriteHeader(http.StatusNotFound)
return
}
cid := cidVal.(*encoding.CID)
if cid.Type == types.CIDTypeResolver {
entry, err := s.getNode().Services().Registry().Get(cid.Hash.FullBytes())
if err != nil {
s.logger.Error("Error getting registry entry", zap.Error(err))
w.WriteHeader(http.StatusInternalServerError)
return
}
cid, err = encoding.CIDFromRegistry(entry.Data())
if err != nil {
s.logger.Error("Error getting CID from registry entry", zap.Error(err))
w.WriteHeader(http.StatusInternalServerError)
return
}
}
switch cid.Type {
case types.CIDTypeRaw:
s.handleDnsLinkRaw(w, r, cid)
case types.CIDTypeMetadataWebapp:
s.handleDnsLinkWebapp(w, r, cid)
case types.CIDTypeDirectory:
s.handleDnsLinkDirectory(w, r, cid)
default:
w.WriteHeader(http.StatusUnsupportedMediaType)
}
}
func (s *S5API) handleDnsLinkRaw(w http.ResponseWriter, r *http.Request, cid *encoding.CID) {
file := s.newFile(FileParams{
Hash: cid.Hash.HashBytes(),
Type: cid.Type,
})
if !file.Exists() {
w.WriteHeader(http.StatusNotFound)
return
}
defer func(file io.ReadCloser) {
err := file.Close()
if err != nil {
s.logger.Error("error closing file", zap.Error(err))
}
}(file)
w.Header().Set("Content-Type", file.Mime())
http.ServeContent(w, r, file.Name(), file.Modtime(), file)
}
func (s *S5API) handleDnsLinkWebapp(w http.ResponseWriter, r *http.Request, cid *encoding.CID) {
http.FileServer(http.FS(newWebAppFs(cid, s))).ServeHTTP(w, r)
}
func (s *S5API) handleDnsLinkDirectory(w http.ResponseWriter, r *http.Request, cid *encoding.CID) {
http.FileServer(http.FS(newDirFs(cid, s))).ServeHTTP(w, r)
}
type s5TusJwtResponseWriter struct {
http.ResponseWriter
req *http.Request
}
func (w *s5TusJwtResponseWriter) WriteHeader(statusCode int) {
// Check if this is the specific route and status
if statusCode == http.StatusCreated {
location := w.Header().Get("Location")
authToken := middleware.ParseAuthTokenHeader(w.req.Header)
if authToken != "" && location != "" {
parsedUrl, _ := url.Parse(location)
query := parsedUrl.Query()
query.Set("auth_token", authToken)
parsedUrl.RawQuery = query.Encode()
w.Header().Set("Location", parsedUrl.String())
}
}
w.ResponseWriter.WriteHeader(statusCode)
}
func BuildTusCors() func(h http.Handler) http.Handler {
mw :=
cors.New(cors.Options{
AllowOriginFunc: func(origin string) bool {
return true
},
AllowedMethods: []string{"GET", "POST", "PATCH", "DELETE", "HEAD", "OPTIONS"},
AllowedHeaders: []string{
"Authorization",
"Expires",
"Upload-Concat",
"Upload-Length",
"Upload-Metadata",
"Upload-Offset",
"X-Requested-With",
"Tus-Version",
"Tus-Resumable",
"Tus-Extension",
"Tus-Max-Size",
"X-HTTP-Method-Override",
"Content-Type",
},
AllowCredentials: true,
})
return mw.Handler
}
func BuildS5TusApi(authMw middleware.HttpMiddlewareFunc, handler *s5.TusHandler) jape.Handler {
// Create a jape.Handler for your tusHandler
tusJapeHandler := func(c jape.Context) {
tusHandler := handler.Tus()
tusHandler.ServeHTTP(c.ResponseWriter, c.Request)
}
protocolMiddleware := func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := context.WithValue(r.Context(), "protocol", "s5")
next.ServeHTTP(w, r.WithContext(ctx))
})
}
stripPrefix := func(next http.Handler) http.Handler {
return http.StripPrefix("/s5/upload/tus", next)
}
injectJwt := func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
res := w
if r.Method == http.MethodPost && r.URL.Path == "/s5/upload/tus" {
res = &s5TusJwtResponseWriter{ResponseWriter: w, req: r}
}
next.ServeHTTP(res, r)
})
}
// Apply the middlewares to the tusJapeHandler
tusHandler := middleware.ApplyMiddlewares(tusJapeHandler, BuildTusCors(), authMw, injectJwt, protocolMiddleware, stripPrefix, middleware.ProxyMiddleware)
return tusHandler
}
type readSeekNopCloser struct {
*bytes.Reader
}
func (rsnc readSeekNopCloser) Close() error {
return nil
}
func (s *S5API) smallFileUpload(jc jape.Context) {
user := middleware.GetUserFromContext(jc.Request.Context())
file, size, err := s.prepareFileUpload(jc)
if err != nil {
s.sendErrorResponse(jc, err)
return
}
defer func(file io.ReadSeekCloser) {
err := file.Close()
if err != nil {
s.logger.Error("Error closing file", zap.Error(err))
}
}(file)
newUpload, err2 := s.storage.UploadObject(jc.Request.Context(), s5.GetStorageProtocol(s.protocol), file, size, nil, nil)
if err2 != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyFileUploadFailed, err2))
return
}
newUpload.UserID = user
newUpload.UploaderIP = jc.Request.RemoteAddr
err2 = s.metadata.SaveUpload(jc.Request.Context(), *newUpload, true)
if err2 != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyFileUploadFailed, err2))
return
}
cid, err2 := encoding.CIDFromHash(newUpload.Hash, newUpload.Size, types.CIDTypeRaw, types.HashTypeBlake3)
if err2 != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyFileUploadFailed, err2))
return
}
err2 = s.accounts.PinByHash(newUpload.Hash, user)
if err2 != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyFileUploadFailed, err2))
return
}
cidStr, err2 := cid.ToString()
if err2 != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyFileUploadFailed, err2))
return
}
jc.Encode(&SmallUploadResponse{
CID: cidStr,
})
}
func (s *S5API) prepareFileUpload(jc jape.Context) (file io.ReadSeekCloser, size uint64, s5Err *S5Error) {
r := jc.Request
contentType := r.Header.Get("Content-Type")
// Handle multipart form data uploads
if strings.HasPrefix(contentType, "multipart/form-data") {
if err := r.ParseMultipartForm(int64(s.config.Config().Core.PostUploadLimit)); err != nil {
return nil, size, NewS5Error(ErrKeyFileUploadFailed, err)
}
multipartFile, _, err := r.FormFile("file")
if err != nil {
return nil, size, NewS5Error(ErrKeyFileUploadFailed, err)
}
return multipartFile, size, nil
}
// Handle raw body uploads
data, err := io.ReadAll(r.Body)
if err != nil {
return nil, size, NewS5Error(ErrKeyFileUploadFailed, err)
}
buffer := readSeekNopCloser{bytes.NewReader(data)}
size = uint64(len(data))
return buffer, size, nil
}
func (s *S5API) accountRegisterChallenge(jc jape.Context) {
var pubkey string
if jc.DecodeForm("pubKey", &pubkey) != nil {
return
}
challenge := make([]byte, 32)
_, err := rand.Read(challenge)
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyInternalError, err))
return
}
decodedKey, err := base64.RawURLEncoding.DecodeString(pubkey)
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyInvalidFileFormat, err))
return
}
if len(decodedKey) != 33 || int(decodedKey[0]) != int(types.HashTypeEd25519) {
s.sendErrorResponse(jc, NewS5Error(ErrKeyDataIntegrityError, fmt.Errorf("invalid public key format")))
return
}
result := s.db.Create(&models.S5Challenge{
Pubkey: pubkey,
Challenge: base64.RawURLEncoding.EncodeToString(challenge),
Type: "register",
})
if result.Error != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, result.Error))
return
}
jc.Encode(&AccountRegisterChallengeResponse{
Challenge: base64.RawURLEncoding.EncodeToString(challenge),
})
}
func (s *S5API) accountRegister(jc jape.Context) {
var request AccountRegisterRequest
if jc.Decode(&request) != nil {
return
}
decodedKey, err := base64.RawURLEncoding.DecodeString(request.Pubkey)
if err != nil || len(decodedKey) != 33 || int(decodedKey[0]) != int(types.HashTypeEd25519) {
s.sendErrorResponse(jc, NewS5Error(ErrKeyInvalidFileFormat, err))
return
}
challenge := models.S5Challenge{
Pubkey: request.Pubkey,
Type: "register",
}
if result := s.db.Where(&challenge).First(&challenge); result.RowsAffected == 0 || result.Error != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyResourceNotFound, result.Error))
return
}
decodedResponse, err := base64.RawURLEncoding.DecodeString(request.Response)
if err != nil || len(decodedResponse) != 65 {
s.sendErrorResponse(jc, NewS5Error(ErrKeyDataIntegrityError, err))
return
}
decodedChallenge, err := base64.RawURLEncoding.DecodeString(challenge.Challenge)
if err != nil || !bytes.Equal(decodedResponse[1:33], decodedChallenge) {
s.sendErrorResponse(jc, NewS5Error(ErrKeyInvalidOperation, err))
return
}
decodedSignature, err := base64.RawURLEncoding.DecodeString(request.Signature)
if err != nil || !ed25519.Verify(decodedKey[1:], decodedResponse, decodedSignature) {
s.sendErrorResponse(jc, NewS5Error(ErrKeyAuthorizationFailed, err))
return
}
if request.Email == "" {
request.Email = fmt.Sprintf("%s@%s", hex.EncodeToString(decodedKey[1:]), "example.com")
}
if accountExists, _, _ := s.accounts.EmailExists(request.Email); accountExists {
s.sendErrorResponse(jc, NewS5Error(ErrKeyResourceLimitExceeded, fmt.Errorf("email already exists")))
return
}
if pubkeyExists, _, _ := s.accounts.PubkeyExists(hex.EncodeToString(decodedKey[1:])); pubkeyExists {
s.sendErrorResponse(jc, NewS5Error(ErrKeyResourceLimitExceeded, fmt.Errorf("pubkey already exists")))
return
}
passwd := make([]byte, 32)
if _, err = rand.Read(passwd); err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyInternalError, err))
return
}
newAccount, err := s.accounts.CreateAccount(request.Email, string(passwd), false)
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, err))
return
}
rawPubkey := hex.EncodeToString(decodedKey[1:])
if err = s.accounts.AddPubkeyToAccount(*newAccount, rawPubkey); err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, err))
return
}
jwt, err := s.accounts.LoginPubkey(rawPubkey, jc.Request.RemoteAddr)
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyAuthenticationFailed, err))
return
}
if result := s.db.Delete(&challenge); result.Error != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, result.Error))
return
}
account.SetAuthCookie(jc, s.config, jwt)
}
func (s *S5API) accountLoginChallenge(jc jape.Context) {
var pubkey string
if jc.DecodeForm("pubKey", &pubkey) != nil {
return
}
challenge := make([]byte, 32)
_, err := rand.Read(challenge)
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyInternalError, err))
return
}
decodedKey, err := base64.RawURLEncoding.DecodeString(pubkey)
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyInvalidFileFormat, err))
return
}
if len(decodedKey) != 33 || int(decodedKey[0]) != int(types.HashTypeEd25519) {
s.sendErrorResponse(jc, NewS5Error(ErrKeyUnsupportedFileType, fmt.Errorf("public key not supported")))
return
}
pubkeyExists, _, _ := s.accounts.PubkeyExists(hex.EncodeToString(decodedKey[1:]))
if !pubkeyExists {
s.sendErrorResponse(jc, NewS5Error(ErrKeyResourceNotFound, fmt.Errorf("public key does not exist")))
return
}
result := s.db.Create(&models.S5Challenge{
Pubkey: pubkey,
Challenge: base64.RawURLEncoding.EncodeToString(challenge),
Type: "login",
})
if result.Error != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, result.Error))
return
}
jc.Encode(&AccountLoginChallengeResponse{
Challenge: base64.RawURLEncoding.EncodeToString(challenge),
})
}
func (s *S5API) accountLogin(jc jape.Context) {
var request AccountLoginRequest
if jc.Decode(&request) != nil {
return
}
decodedKey, err := base64.RawURLEncoding.DecodeString(request.Pubkey)
if err != nil || len(decodedKey) != 32 {
s.sendErrorResponse(jc, NewS5Error(ErrKeyInvalidFileFormat, err))
return
}
if int(decodedKey[0]) != int(types.HashTypeEd25519) {
s.sendErrorResponse(jc, NewS5Error(ErrKeyUnsupportedFileType, fmt.Errorf("public key type not supported")))
return
}
var challenge models.S5Challenge
result := s.db.Where(&models.S5Challenge{Pubkey: request.Pubkey, Type: "login"}).First(&challenge)
if result.RowsAffected == 0 || result.Error != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyResourceNotFound, result.Error))
return
}
decodedResponse, err := base64.RawURLEncoding.DecodeString(request.Response)
if err != nil || len(decodedResponse) != 65 {
s.sendErrorResponse(jc, NewS5Error(ErrKeyInvalidOperation, err))
return
}
decodedChallenge, err := base64.RawURLEncoding.DecodeString(challenge.Challenge)
if err != nil || !bytes.Equal(decodedResponse[1:33], decodedChallenge) {
s.sendErrorResponse(jc, NewS5Error(ErrKeyDataIntegrityError, err))
return
}
decodedSignature, err := base64.RawURLEncoding.DecodeString(request.Signature)
if err != nil || !ed25519.Verify(decodedKey[1:], decodedResponse, decodedSignature) {
s.sendErrorResponse(jc, NewS5Error(ErrKeyAuthorizationFailed, err))
return
}
jwt, err := s.accounts.LoginPubkey(hex.EncodeToString(decodedKey[1:]), jc.Request.RemoteAddr)
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyAuthenticationFailed, err))
return
}
if result := s.db.Delete(&challenge); result.Error != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, result.Error))
return
}
account.SetAuthCookie(jc, s.config, jwt)
}
func (s *S5API) accountInfo(jc jape.Context) {
userID := middleware.GetUserFromContext(jc.Request.Context())
_, user, _ := s.accounts.AccountExists(userID)
info := &AccountInfoResponse{
Email: user.Email,
QuotaExceeded: false,
EmailConfirmed: false,
IsRestricted: false,
Tier: AccountTier{
Id: 1,
Name: "default",
UploadBandwidth: math.MaxUint32,
StorageLimit: math.MaxUint32,
Scopes: []interface{}{},
},
}
jc.Encode(info)
}
func (s *S5API) accountStats(jc jape.Context) {
userID := middleware.GetUserFromContext(jc.Request.Context())
_, user, _ := s.accounts.AccountExists(userID)
info := &AccountStatsResponse{
AccountInfoResponse: AccountInfoResponse{
Email: user.Email,
QuotaExceeded: false,
EmailConfirmed: false,
IsRestricted: false,
Tier: AccountTier{
Id: 1,
Name: "default",
UploadBandwidth: math.MaxUint32,
StorageLimit: math.MaxUint32,
Scopes: []interface{}{},
},
},
Stats: AccountStats{
Total: AccountStatsTotal{
UsedStorage: 0,
},
},
}
jc.Encode(info)
}
func (s *S5API) accountPinsBinary(jc jape.Context) {
var cursor uint64
if err := jc.DecodeForm("cursor", &cursor); err != nil {
return
}
userID := middleware.GetUserFromContext(jc.Request.Context())
pins, err := s.accounts.AccountPins(userID, cursor)
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, err))
return
}
pinResponse := &AccountPinBinaryResponse{Cursor: cursor, Pins: pins}
result, err2 := msgpack.Marshal(pinResponse)
if err2 != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyInternalError, err2))
return
}
jc.ResponseWriter.Header().Set("Content-Type", "application/msgpack")
jc.ResponseWriter.WriteHeader(http.StatusOK)
if _, err := jc.ResponseWriter.Write(result); err != nil {
s.logger.Error("failed to write account pins response", zap.Error(err))
}
}
func (s *S5API) accountPins(jc jape.Context) {
userID := middleware.GetUserFromContext(jc.Request.Context())
pinsRet, err := s.accounts.AccountPins(userID, 0)
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, err))
return
}
tusRet, err := s.tusHandler.Uploads(jc.Request.Context(), userID)
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, err))
return
}
pins := make([]AccountPin, len(pinsRet)+len(tusRet))
for i, pin := range pinsRet {
cid, err := encoding.CIDFromHash(pin.Upload.Hash, pin.Upload.Size, types.CIDTypeRaw, types.HashTypeBlake3)
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyInternalError, err))
return
}
base64Url, err := cid.Hash.ToBase64Url()
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyInternalError, err))
return
}
pins[i] = AccountPin{
Hash: base64Url,
Size: pin.Upload.Size,
PinnedAt: pin.CreatedAt,
MimeType: pin.Upload.MimeType,
}
}
for i, tus := range tusRet {
size, err := s.tusHandler.GetUploadSize(jc.Request.Context(), tus.Hash)
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyInternalError, err))
return
}
cid, err := encoding.CIDFromHash(tus.Hash, uint64(size), types.CIDTypeRaw, types.HashTypeBlake3)
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyInternalError, err))
return
}
base64Url, err := cid.Hash.ToBase64Url()
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyInternalError, err))
return
}
pins[i+len(pinsRet)] = AccountPin{
Hash: base64Url,
Size: uint64(size),
PinnedAt: tus.CreatedAt,
MimeType: tus.MimeType,
}
}
jc.Encode(&AccountPinResponse{Pins: pins})
}
func (s *S5API) accountPinDelete(jc jape.Context) {
var cid string
if err := jc.DecodeParam("cid", &cid); err != nil {
return
}
user := middleware.GetUserFromContext(jc.Request.Context())
decodedCid, err := encoding.CIDFromString(cid)
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyInvalidOperation, err))
return
}
if err := s.accounts.DeletePinByHash(decodedCid.Hash.HashBytes(), user); err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, err))
return
}
jc.ResponseWriter.WriteHeader(http.StatusOK)
}
func (s *S5API) getManifestCids(ctx context.Context, cid *encoding.CID, addSelf bool) ([]*encoding.CID, error) {
var cids []*encoding.CID
if middleware.CtxAborted(ctx) {
return nil, ctx.Err()
}
manifest, err := s.getNode().Services().Storage().GetMetadataByCID(cid)
if err != nil {
return nil, err
}
if addSelf {
cids = append(cids, cid)
}
switch cid.Type {
case types.CIDTypeMetadataMedia:
media := manifest.(*s5libmetadata.MediaMetadata)
for _, mediaType := range media.MediaTypes {
lo.ForEach(mediaType, func(format s5libmetadata.MediaFormat, _i int) {
if format.Cid != nil {
cids = append(cids, format.Cid)
}
})
}
case types.CIDTypeDirectory:
dir := manifest.(*s5libmetadata.DirectoryMetadata)
lo.ForEach(lo.Values(dir.Directories.Items()), func(d *s5libmetadata.DirectoryReference, _i int) {
if middleware.CtxAborted(ctx) {
return
}
entry, err := s.getNode().Services().Registry().Get(d.PublicKey)
if err != nil || entry == nil {
s.logger.Error("Error getting registry entry", zap.Error(err))
return
}
cid, err := encoding.CIDFromRegistry(entry.Data())
if err != nil {
s.logger.Error("Error getting CID from registry entry", zap.Error(err))
return
}
childCids, err := s.getManifestCids(ctx, cid, true)
if err != nil {
s.logger.Error("Error getting child manifest CIDs", zap.Error(err))
return
}
cids = append(cids, childCids...)
})
lo.ForEach(lo.Values(dir.Files.Items()), func(f *s5libmetadata.FileReference, _i int) {
cids = append(cids, f.File.CID())
})
case types.CIDTypeMetadataWebapp:
webapp := manifest.(*s5libmetadata.WebAppMetadata)
lo.ForEach(webapp.Paths.Values(), func(f s5libmetadata.WebAppMetadataFileReference, _i int) {
cids = append(cids, f.Cid)
})
}
if middleware.CtxAborted(ctx) {
return nil, ctx.Err()
}
return cids, nil
}
func (s *S5API) accountPinManifest(jc jape.Context, userId uint, cid *encoding.CID, addSelf bool) {
type pinResult struct {
Success bool `json:"success"`
Error error `json:"error,omitempty"`
}
type pinQueueResult struct {
success bool
error error
cid *encoding.CID
}
cids, err := s.getManifestCids(jc.Request.Context(), cid, addSelf)
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyInvalidOperation, err))
return
}
q := queue.NewPool(10)
defer q.Release()
rets := make(chan pinQueueResult)
defer close(rets)
results := make(map[string]pinResult, len(cids))
for i := 0; i < len(cids); i++ {
cid := cids[i]
go func(cid *encoding.CID) {
if err := q.QueueTask(func(ctx context.Context) error {
ret := pinQueueResult{
success: true,
error: nil,
cid: cid,
}
err := s.pinEntity(ctx, userId, jc.Request.RemoteAddr, cid)
if err != nil {
s.logger.Error("Error pinning entity", zap.Error(err))
ret.success = false
ret.error = err
}
rets <- ret
return nil
}); err != nil {
s.logger.Error("Error queueing task", zap.Error(err))
rets <- pinQueueResult{
success: false,
error: err,
cid: cid,
}
}
}(cid)
}
go func() {
received := 0
for ret := range rets {
b64, err := ret.cid.ToBase64Url()
if err != nil {
s.logger.Error("Error encoding CID to base64", zap.Error(err))
continue
}
results[b64] = pinResult{
Success: ret.success,
Error: ret.error,
}
received++
if received == len(cids) {
q.Release()
}
}
}()
q.Wait()
if middleware.CtxAborted(jc.Request.Context()) {
return
}
jc.Encode(&results)
}
func (s *S5API) accountPin(jc jape.Context) {
var cid string
if err := jc.DecodeParam("cid", &cid); err != nil {
return
}
userID := middleware.GetUserFromContext(jc.Request.Context())
decodedCid, err := encoding.CIDFromString(cid)
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyInvalidOperation, err))
return
}
if decodedCid.Type == types.CIDTypeResolver {
entry, err := s.getNode().Services().Registry().Get(decodedCid.Hash.FullBytes())
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyResourceNotFound, err))
return
}
decodedCid, err = encoding.CIDFromRegistry(entry.Data())
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyInternalError, err))
return
}
}
found := true
if err := s.accounts.PinByHash(decodedCid.Hash.HashBytes(), userID); err != nil {
if !errors.Is(err, gorm.ErrRecordNotFound) {
s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, err))
return
}
found = false
}
if !found {
if isCidManifest(decodedCid) {
s.accountPinManifest(jc, userID, decodedCid, true)
return
} else {
err = s.pinEntity(jc.Request.Context(), userID, jc.Request.RemoteAddr, decodedCid)
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, err))
return
}
}
} else {
if isCidManifest(decodedCid) {
cids, err := s.getManifestCids(jc.Request.Context(), decodedCid, false)
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, err))
return
}
for _, cid := range cids {
err := s.pinEntity(jc.Request.Context(), userID, jc.Request.RemoteAddr, cid)
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, err))
return
}
}
}
}
jc.ResponseWriter.WriteHeader(http.StatusNoContent)
}
func (s *S5API) accountPinStatus(jc jape.Context) {
var cid string
if err := jc.DecodeParam("cid", &cid); err != nil {
return
}
decodedCid, err := encoding.CIDFromString(cid)
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyInvalidOperation, err))
return
}
meta, err := s._import.GetImport(jc.Request.Context(), decodedCid.Hash.HashBytes())
if err != nil {
jc.Encode(&AccountPinStatusResponse{
Status: models.ImportStatusCompleted,
Progress: 100,
})
return
}
jc.Encode(&AccountPinStatusResponse{
Status: meta.Status,
Progress: meta.Progress,
})
}
func (s *S5API) pinEntity(ctx context.Context, userId uint, userIp string, cid *encoding.CID) error {
found := true
if err := s.accounts.PinByHash(cid.Hash.HashBytes(), userId); err != nil {
if !errors.Is(err, gorm.ErrRecordNotFound) {
return err
}
found = false
}
if found {
return nil
}
dlUriProvider := s.newStorageLocationProvider(&cid.Hash, true, types.StorageLocationTypeFull, types.StorageLocationTypeFile)
err := dlUriProvider.Start()
if err != nil {
return err
}
locations, err := dlUriProvider.All()
if err != nil {
return err
}
locations = lo.FilterMap(locations, func(location storage2.SignedStorageLocation, index int) (storage2.SignedStorageLocation, bool) {
r := rq.Get(location.Location().BytesURL())
httpReq, err := r.ParseRequest()
if err != nil {
return nil, false
}
res, err := http.DefaultClient.Do(httpReq)
if err != nil {
err = dlUriProvider.Downvote(location)
if err != nil {
s.logger.Error("Error downvoting location", zap.Error(err))
return nil, false
}
return nil, false
}
defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {
s.logger.Error("Error closing response body", zap.Error(err))
}
}(res.Body)
// Use io.LimitedReader to limit the download size and attempt to detect if there's more data.
limitedReader := &io.LimitedReader{R: res.Body, N: int64(s.config.Config().Core.PostUploadLimit + 1)}
data, err := io.ReadAll(limitedReader)
if err != nil {
return nil, false
}
if !isCidManifest(cid) {
if limitedReader.N > 0 {
if uint64(len(data)) != cid.Size {
return nil, false
}
}
} else {
dataCont, err := io.ReadAll(res.Body)
if err != nil {
return nil, false
}
data = append(data, dataCont...)
proof, err := s.storage.HashObject(ctx, bytes.NewReader(data), uint64(len(data)))
if err != nil {
return nil, false
}
if !bytes.Equal(proof.Hash, cid.Hash.HashBytes()) {
return nil, false
}
}
return location, true
})
if len(locations) == 0 {
return fmt.Errorf("CID could not be found on the network")
}
location := locations[0]
cid64, err := cid.ToBase64Url()
if err != nil {
return nil
}
if middleware.CtxAborted(ctx) {
return ctx.Err()
}
jobName := fmt.Sprintf("pin-import-%s", cid64)
if job := s.cron.GetJobByName(jobName); job == nil {
job := s.cron.RetryableJob(
cron.RetryableJobParams{
Name: jobName,
Tags: nil,
Function: s.pinImportCronJob,
Args: []interface{}{cid64, location.Location().BytesURL(), location.Location().OutboardBytesURL(), userId},
Attempt: 0,
Limit: 10,
After: nil,
Error: nil,
},
)
_, err = s.cron.CreateJob(job)
if err != nil {
return nil
}
err = s._import.SaveImport(ctx, _import.ImportMetadata{
UserID: userId,
Hash: cid.Hash.HashBytes(),
Protocol: s5.GetStorageProtocol(s.protocol).Name(),
ImporterIP: userIp,
}, true)
if err != nil {
return err
}
}
return nil
}
type dirTryFiles []string
type dirErrorPages map[int]string
func (d *dirTryFiles) UnmarshalText(data []byte) error {
var out []string
err := json.Unmarshal(data, &out)
if err != nil {
return err
}
*d = out
return nil
}
func (d *dirErrorPages) UnmarshalText(data []byte) error {
var out map[int]string
err := json.Unmarshal(data, &out)
if err != nil {
return err
}
*d = out
return nil
}
func (s *S5API) directoryUpload(jc jape.Context) {
// Decode form fields
var (
tryFiles dirTryFiles
errorPages dirErrorPages
name string
)
if err := jc.DecodeForm("tryFiles", &tryFiles); err != nil || jc.DecodeForm("errorPages", &errorPages) != nil || jc.DecodeForm("name", &name) != nil {
return
}
// Verify content type
if contentType := jc.Request.Header.Get("Content-Type"); !strings.HasPrefix(contentType, "multipart/form-data") {
s.sendErrorResponse(jc, NewS5Error(ErrKeyInvalidOperation, fmt.Errorf("expected multipart/form-data content type, got %s", contentType)))
return
}
// Parse multipart form with size limit from config
if err := jc.Request.ParseMultipartForm(int64(s.config.Config().Core.PostUploadLimit)); err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyInvalidOperation, err))
return
}
uploads, err := s.processMultipartFiles(jc.Request)
if err != nil {
s.sendErrorResponse(jc, err)
return
}
var webappErrorPages s5libmetadata.WebAppErrorPages
for code, page := range errorPages {
webappErrorPages[code] = page
}
// Generate metadata for the directory upload
app, err := s.createAppMetadata(name, tryFiles, webappErrorPages, uploads)
if err != nil {
s.sendErrorResponse(jc, err)
return
}
// Upload the metadata
cidStr, err := s.uploadAppMetadata(app, jc.Request)
if err != nil {
s.sendErrorResponse(jc, err)
return
}
jc.Encode(&AppUploadResponse{CID: cidStr})
}
func (s *S5API) processMultipartFiles(r *http.Request) (map[string]*metadata.UploadMetadata, error) {
uploadMap := make(map[string]*metadata.UploadMetadata)
user := middleware.GetUserFromContext(r.Context())
for _, files := range r.MultipartForm.File {
for _, fileHeader := range files {
filename := extractMPFilename(fileHeader.Header)
if filename == "" {
return nil, NewS5Error(ErrKeyInvalidOperation, fmt.Errorf("filename not found in multipart file header"))
}
file, err := fileHeader.Open()
if err != nil {
return nil, NewS5Error(ErrKeyStorageOperationFailed, err)
}
defer func(file multipart.File) {
err := file.Close()
if err != nil {
s.logger.Error("Error closing file", zap.Error(err))
}
}(file)
upload, err := s.storage.UploadObject(r.Context(), s5.GetStorageProtocol(s.protocol), file, uint64(fileHeader.Size), nil, nil)
if err != nil {
return nil, NewS5Error(ErrKeyStorageOperationFailed, err)
}
upload.UserID = user
upload.UploaderIP = r.RemoteAddr
err = s.metadata.SaveUpload(r.Context(), *upload, true)
if err != nil {
return nil, NewS5Error(ErrKeyStorageOperationFailed, err)
}
err = s.accounts.PinByHash(upload.Hash, user)
if err != nil {
return nil, NewS5Error(ErrKeyStorageOperationFailed, err)
}
uploadMap[filename] = upload
}
}
return uploadMap, nil
}
func (s *S5API) createAppMetadata(name string, tryFiles []string, errorPages s5libmetadata.WebAppErrorPages, uploads map[string]*metadata.UploadMetadata) (*s5libmetadata.WebAppMetadata, error) {
filesMap := s5libmetadata.NewWebAppFileMap()
for filename, upload := range uploads {
hash := upload.Hash
cid, err := encoding.CIDFromHash(hash, upload.Size, types.CIDTypeRaw, types.HashTypeBlake3)
if err != nil {
return nil, NewS5Error(ErrKeyInternalError, err, "Failed to create CID for file: "+filename)
}
filesMap.Put(filename, s5libmetadata.WebAppMetadataFileReference{
Cid: cid,
ContentType: upload.MimeType,
})
}
filesMap.Sort()
extraMetadataMap := make(map[int]interface{})
for statusCode, page := range errorPages {
extraMetadataMap[statusCode] = page
}
extraMetadata := s5libmetadata.NewExtraMetadata(extraMetadataMap)
// Create the web app metadata object
app := s5libmetadata.NewWebAppMetadata(
name,
tryFiles,
*extraMetadata,
errorPages,
filesMap,
)
return app, nil
}
func (s *S5API) uploadAppMetadata(appData *s5libmetadata.WebAppMetadata, r *http.Request) (string, error) {
userId := middleware.GetUserFromContext(r.Context())
appDataRaw, err := msgpack.Marshal(appData)
if err != nil {
return "", NewS5Error(ErrKeyInternalError, err, "Failed to marshal app s5libmetadata")
}
file := bytes.NewReader(appDataRaw)
upload, err := s.storage.UploadObject(r.Context(), s5.GetStorageProtocol(s.protocol), file, uint64(len(appDataRaw)), nil, nil)
if err != nil {
return "", NewS5Error(ErrKeyStorageOperationFailed, err)
}
upload.UserID = userId
upload.UploaderIP = r.RemoteAddr
err = s.metadata.SaveUpload(r.Context(), *upload, true)
if err != nil {
return "", NewS5Error(ErrKeyStorageOperationFailed, err)
}
err = s.accounts.PinByHash(upload.Hash, userId)
if err != nil {
return "", NewS5Error(ErrKeyStorageOperationFailed, err)
}
// Construct the CID for the newly uploaded s5libmetadata
cid, err := encoding.CIDFromHash(upload.Hash, uint64(len(appDataRaw)), types.CIDTypeMetadataWebapp, types.HashTypeBlake3)
if err != nil {
return "", NewS5Error(ErrKeyInternalError, err, "Failed to create CID for new app s5libmetadata")
}
cidStr, err := cid.ToString()
if err != nil {
return "", NewS5Error(ErrKeyInternalError, err, "Failed to convert CID to string for new app s5libmetadata")
}
return cidStr, nil
}
func (s *S5API) debugDownloadUrls(jc jape.Context) {
var cid string
if err := jc.DecodeParam("cid", &cid); err != nil {
return
}
decodedCid, err := encoding.CIDFromString(cid)
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyInvalidOperation, err, "Failed to decode CID"))
return
}
node := s.getNode()
dlUriProvider := s.newStorageLocationProvider(&decodedCid.Hash, false, types.StorageLocationTypeFull, types.StorageLocationTypeFile, types.StorageLocationTypeBridge)
if err := dlUriProvider.Start(); err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, err, "Failed to start URI provider"))
return
}
locations, err := node.Services().Storage().GetCachedStorageLocations(&decodedCid.Hash, []types.StorageLocationType{
types.StorageLocationTypeFull, types.StorageLocationTypeFile, types.StorageLocationTypeBridge,
}, true)
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, err, "Failed to get cached storage locations"))
return
}
availableNodes := lo.Keys[string, storage2.StorageLocation](locations)
availableNodesIds := make([]*encoding.NodeId, len(availableNodes))
for i, nodeIdStr := range availableNodes {
nodeId, err := encoding.DecodeNodeId(nodeIdStr)
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyInternalError, err, "Failed to decode node ID"))
return
}
availableNodesIds[i] = nodeId
}
sorted, err := node.Services().P2P().SortNodesByScore(availableNodesIds)
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyNetworkError, err, "Failed to sort nodes by score"))
return
}
output := make([]string, len(sorted))
for i, nodeId := range sorted {
nodeIdStr, err := nodeId.ToString()
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyInternalError, err, "Failed to convert node ID to string"))
return
}
output[i] = locations[nodeIdStr].BytesURL()
}
jc.ResponseWriter.WriteHeader(http.StatusOK)
_, err = jc.ResponseWriter.Write([]byte(strings.Join(output, "\n")))
if err != nil {
s.logger.Error("Failed to write response", zap.Error(err))
}
}
func (s *S5API) registryQuery(jc jape.Context) {
var pk string
if err := jc.DecodeForm("pk", &pk); err != nil {
return
}
pkBytes, err := base64.RawURLEncoding.DecodeString(pk)
if err != nil {
s5Err := NewS5Error(ErrKeyInvalidFileFormat, err)
s.sendErrorResponse(jc, s5Err)
return
}
entry, err := s.getNode().Services().Registry().Get(pkBytes)
if err != nil {
s5ErrKey := ErrKeyStorageOperationFailed
s5Err := NewS5Error(s5ErrKey, err)
s.sendErrorResponse(jc, s5Err)
return
}
if entry == nil {
jc.ResponseWriter.WriteHeader(http.StatusNotFound)
return
}
response := RegistryQueryResponse{
Pk: base64.RawURLEncoding.EncodeToString(entry.PK()),
Revision: entry.Revision(),
Data: base64.RawURLEncoding.EncodeToString(entry.Data()),
Signature: base64.RawURLEncoding.EncodeToString(entry.Signature()),
}
jc.Encode(response)
}
func (s *S5API) registrySet(jc jape.Context) {
var request RegistrySetRequest
if err := jc.Decode(&request); err != nil {
return
}
pk, err := base64.RawURLEncoding.DecodeString(request.Pk)
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyInvalidFileFormat, err, "Error decoding public key"))
return
}
data, err := base64.RawURLEncoding.DecodeString(request.Data)
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyInvalidFileFormat, err, "Error decoding data"))
return
}
signature, err := base64.RawURLEncoding.DecodeString(request.Signature)
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyInvalidFileFormat, err, "Error decoding signature"))
return
}
entry := protocol.NewSignedRegistryEntry(pk, request.Revision, data, signature)
err = s.getNode().Services().Registry().Set(entry, false, nil)
if err != nil {
s.sendErrorResponse(jc, NewS5Error(ErrKeyStorageOperationFailed, err, "Error setting registry entry"))
return
}
}
func (s *S5API) registrySubscription(jc jape.Context) {
// Create a context for the WebSocket operations
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var listeners []func()
// Accept the WebSocket connection
c, err := websocket.Accept(jc.ResponseWriter, jc.Request, nil)
if err != nil {
s.logger.Error("error accepting websocket connection", zap.Error(err))
return
}
defer func() {
// Close the WebSocket connection gracefully
err := c.Close(websocket.StatusNormalClosure, "connection closed")
if err != nil {
s.logger.Error("error closing websocket connection", zap.Error(err))
}
// Clean up all listeners when the connection is closed
for _, listener := range listeners {
listener()
}
}()
// Main loop for reading messages
for {
_, data, err := c.Read(ctx)
if err != nil {
if websocket.CloseStatus(err) == websocket.StatusNormalClosure {
// Normal closure
s.logger.Info("websocket connection closed normally")
} else {
// Handle different types of errors
s.logger.Error("error in websocket connection", zap.Error(err))
}
break
}
decoder := msgpack.NewDecoder(bytes.NewReader(data))
// Assuming method indicates the type of operation, validate it
method, err := decoder.DecodeInt()
if err != nil {
s.logger.Error("error decoding method", zap.Error(err))
continue
}
if method != 2 {
s.logger.Error("invalid method", zap.Int64("method", int64(method)))
continue
}
sre, err := decoder.DecodeBytes()
if err != nil {
s.logger.Error("error decoding sre", zap.Error(err))
continue
}
// Listen for updates on the registry entry and send updates via WebSocket
off, err := s.getNode().Services().Registry().Listen(sre, func(entry protocol.SignedRegistryEntry) {
encoded, err := msgpack.Marshal(entry)
if err != nil {
s.logger.Error("error encoding entry", zap.Error(err))
return
}
// Write updates to the WebSocket connection
if err := c.Write(ctx, websocket.MessageBinary, encoded); err != nil {
s.logger.Error("error writing to websocket", zap.Error(err))
}
})
if err != nil {
s.logger.Error("error setting up listener for registry", zap.Error(err))
break
}
listeners = append(listeners, off) // Add the listener's cleanup function to the list
}
}
func (s *S5API) getNode() *node.Node {
return s.protocol.Node()
}
func (s *S5API) downloadBlob(jc jape.Context) {
var cid string
if jc.DecodeParam("cid", &cid) != nil {
return
}
cid = strings.Split(cid, ".")[0]
cidDecoded, err := encoding.CIDFromString(cid)
if jc.Check("error decoding cid", err) != nil {
return
}
dlUriProvider := s.newStorageLocationProvider(&cidDecoded.Hash, true, types.StorageLocationTypeFull, types.StorageLocationTypeFile, types.StorageLocationTypeBridge)
err = dlUriProvider.Start()
if jc.Check("error starting search", err) != nil {
return
}
next, err := dlUriProvider.Next()
if jc.Check("error fetching blob", err) != nil {
return
}
http.Redirect(jc.ResponseWriter, jc.Request, next.Location().BytesURL(), http.StatusFound)
}
func (s *S5API) debugStorageLocations(jc jape.Context) {
var hash string
if jc.DecodeParam("hash", &hash) != nil {
return
}
var kinds string
if jc.DecodeForm("kinds", &kinds) != nil {
return
}
decodedHash, err := encoding.MultihashFromBase64Url(hash)
if jc.Check("error decoding hash", err) != nil {
return
}
typeList := strings.Split(kinds, ",")
typeIntList := make([]types.StorageLocationType, 0)
for _, typeStr := range typeList {
typeInt, err := strconv.Atoi(typeStr)
if err != nil {
continue
}
typeIntList = append(typeIntList, types.StorageLocationType(typeInt))
}
if len(typeIntList) == 0 {
typeIntList = []types.StorageLocationType{
types.StorageLocationTypeFull,
types.StorageLocationTypeFile,
types.StorageLocationTypeBridge,
types.StorageLocationTypeArchive,
}
}
dlUriProvider := s.newStorageLocationProvider(decodedHash, false, typeIntList...)
err = dlUriProvider.Start()
if jc.Check("error starting search", err) != nil {
return
}
_, err = dlUriProvider.Next()
if jc.Check("error fetching locations", err) != nil {
return
}
locations, err := s.getNode().Services().Storage().GetCachedStorageLocations(decodedHash, typeIntList, true)
if jc.Check("error getting cached locations", err) != nil {
return
}
availableNodes := lo.Keys[string, storage2.StorageLocation](locations)
availableNodesIds := make([]*encoding.NodeId, len(availableNodes))
for i, nodeIdStr := range availableNodes {
nodeId, err := encoding.DecodeNodeId(nodeIdStr)
if jc.Check("error decoding node id", err) != nil {
return
}
availableNodesIds[i] = nodeId
}
availableNodesIds, err = s.getNode().Services().P2P().SortNodesByScore(availableNodesIds)
if jc.Check("error sorting nodes", err) != nil {
return
}
debugLocations := make([]DebugStorageLocation, len(availableNodes))
for i, nodeId := range availableNodesIds {
nodeIdStr, err := nodeId.ToBase58()
if jc.Check("error encoding node id", err) != nil {
return
}
score, err := s.getNode().Services().P2P().GetNodeScore(nodeId)
if jc.Check("error getting node score", err) != nil {
return
}
debugLocations[i] = DebugStorageLocation{
Type: locations[nodeIdStr].Type(),
Parts: locations[nodeIdStr].Parts(),
Expiry: locations[nodeIdStr].Expiry(),
NodeId: nodeIdStr,
Score: score,
}
}
jc.Encode(&DebugStorageLocationsResponse{
Locations: debugLocations,
})
}
func (s *S5API) downloadMetadata(jc jape.Context) {
var cid string
if jc.DecodeParam("cid", &cid) != nil {
return
}
cidDecoded, err := encoding.CIDFromString(cid)
if jc.Check("error decoding cid", err) != nil {
s.logger.Error("error decoding cid", zap.Error(err))
return
}
switch cidDecoded.Type {
case types.CIDTypeRaw:
_ = jc.Error(errors.New("Raw CIDs do not have metadata"), http.StatusUnsupportedMediaType)
return
case types.CIDTypeResolver:
_ = jc.Error(errors.New("Resolver CIDs not yet supported"), http.StatusUnsupportedMediaType)
return
}
meta, err := s.getNode().Services().Storage().GetMetadataByCID(cidDecoded)
if jc.Check("error getting metadata", err) != nil {
s.logger.Error("error getting metadata", zap.Error(err))
return
}
if cidDecoded.Type != types.CIDTypeBridge {
jc.ResponseWriter.Header().Set("Cache-Control", "public, max-age=31536000")
} else {
jc.ResponseWriter.Header().Set("Cache-Control", "public, max-age=60")
}
jc.Encode(&meta)
}
func (s *S5API) downloadFile(jc jape.Context) {
var cid string
if jc.DecodeParam("cid", &cid) != nil {
return
}
var hashBytes []byte
var typ types.CIDType
isProof := false
if strings.HasSuffix(cid, storage.PROOF_EXTENSION) {
isProof = true
cid = strings.TrimSuffix(cid, storage.PROOF_EXTENSION)
}
cidDecoded, err := encoding.CIDFromString(cid)
if err != nil {
hashDecoded, err := encoding.MultihashFromBase64Url(cid)
if jc.Check("error decoding as cid or hash", err) != nil {
return
}
hashBytes = hashDecoded.HashBytes()
} else {
hashBytes = cidDecoded.Hash.HashBytes()
typ = cidDecoded.Type
}
file := s.newFile(FileParams{
Hash: hashBytes,
Type: typ,
})
if !file.Exists() {
jc.ResponseWriter.WriteHeader(http.StatusNotFound)
return
}
defer func(file io.ReadCloser) {
err := file.Close()
if err != nil {
s.logger.Error("error closing file", zap.Error(err))
}
}(file)
if isProof {
proof, err := file.Proof()
if jc.Check("error getting proof", err) != nil {
return
}
jc.ResponseWriter.Header().Set("Content-Type", "application/octet-stream")
http.ServeContent(jc.ResponseWriter, jc.Request, fmt.Sprintf("%.obao", file.Name()), file.Modtime(), bytes.NewReader(proof))
return
}
var mimeType string
if len(file.Mime()) > 0 {
mimeType = file.Mime()
}
if len(mimeType) == 0 {
detectedType, err := mimetype.DetectReader(file)
if err != nil {
s.logger.Error("error detecting mime type", zap.Error(err))
_ = jc.Error(err, http.StatusInternalServerError)
return
}
mimeType = detectedType.String()
_, err = file.Seek(0, io.SeekStart)
if err != nil {
s.logger.Error("error seeking file", zap.Error(err))
_ = jc.Error(err, http.StatusInternalServerError)
return
}
}
if len(mimeType) == 0 {
mimeType = "application/octet-stream"
}
jc.ResponseWriter.Header().Set("Content-Type", mimeType)
http.ServeContent(jc.ResponseWriter, jc.Request, file.Name(), file.Modtime(), file)
}
func (s *S5API) sendErrorResponse(jc jape.Context, err error) {
var statusCode int
switch e := err.(type) {
case *S5Error:
statusCode = e.HttpStatus()
case *account.AccountError:
mappedCode, ok := account.ErrorCodeToHttpStatus[e.Key]
if !ok {
statusCode = http.StatusInternalServerError
} else {
statusCode = mappedCode
}
default:
statusCode = http.StatusInternalServerError
err = errors.New("An internal server error occurred.")
}
_ = jc.Error(err, statusCode)
}
func (s *S5API) newStorageLocationProvider(hash *encoding.Multihash, excludeSelf bool, types ...types.StorageLocationType) storage2.StorageLocationProvider {
excludeNodes := make([]*encoding.NodeId, 0)
if excludeSelf {
excludeNodes = append(excludeNodes, s.getNode().NodeId())
}
return provider.NewStorageLocationProvider(provider.StorageLocationProviderParams{
Services: s.getNode().Services(),
Hash: hash,
LocationTypes: types,
ServiceParams: service.ServiceParams{
Logger: s.logger,
Config: s.getNode().Config(),
Db: s.getNode().Db(),
},
ExcludeNodes: excludeNodes,
})
}
func (s *S5API) newFile(params FileParams) *S5File {
params.Protocol = s.protocol
params.Metadata = s.metadata
params.Storage = s.storage
params.Tus = s.tusHandler
return NewFile(params)
}
func (s *S5API) pinImportCronJob(cid string, url string, proofUrl string, userId uint) error {
ctx := context.Background()
totalStages := 3
// Parse CID early to avoid unnecessary operations if it fails.
parsedCid, err := encoding.CIDFromString(cid)
if err != nil {
s.logger.Error("error parsing cid", zap.Error(err))
return err
}
err = s._import.UpdateStatus(ctx, parsedCid.Hash.HashBytes(), models.ImportStatusProcessing)
if err != nil {
return err
}
// Function to streamline error handling and closing of response body.
closeBody := func(body io.ReadCloser) {
if err := body.Close(); err != nil {
s.logger.Error("error closing response body", zap.Error(err))
}
}
// Inline fetching and reading body, directly incorporating your checks.
fetchAndProcess := func(fetchUrl string, progressStage int) ([]byte, error) {
req, err := rq.Get(fetchUrl).ParseRequest()
if err != nil {
s.logger.Error("error parsing request", zap.Error(err))
return nil, err
}
res, err := http.DefaultClient.Do(req)
if err != nil {
s.logger.Error("error executing request", zap.Error(err))
return nil, err
}
defer closeBody(res.Body)
if res.StatusCode != http.StatusOK {
errMsg := "error fetching URL: " + fetchUrl
s.logger.Error(errMsg, zap.String("status", res.Status))
return nil, fmt.Errorf(errMsg+" with status: %s", res.Status)
}
data, err := io.ReadAll(res.Body)
if err != nil {
s.logger.Error("error reading response body", zap.Error(err))
return nil, err
}
err = s._import.UpdateProgress(ctx, parsedCid.Hash.HashBytes(), progressStage, totalStages)
if err != nil {
return nil, err
}
return data, nil
}
saveAndPin := func(upload *metadata.UploadMetadata) error {
err = s._import.UpdateProgress(ctx, parsedCid.Hash.HashBytes(), 3, totalStages)
if err != nil {
return err
}
upload.UserID = userId
if err := s.metadata.SaveUpload(ctx, *upload, true); err != nil {
return err
}
if err := s.accounts.PinByHash(upload.Hash, userId); err != nil {
return err
}
err = s._import.DeleteImport(ctx, upload.Hash)
if err != nil {
return err
}
return nil
}
// Fetch file and process if under post upload limit.
if parsedCid.Size <= s.config.Config().Core.PostUploadLimit {
fileData, err := fetchAndProcess(url, 1)
if err != nil {
return err // Error logged in fetchAndProcess
}
hash, err := s.storage.HashObject(ctx, bytes.NewReader(fileData), uint64(len(fileData)))
if err != nil {
s.logger.Error("error hashing object", zap.Error(err))
return err
}
if !bytes.Equal(hash.Hash, parsedCid.Hash.HashBytes()) {
return fmt.Errorf("hash mismatch")
}
err = s._import.UpdateProgress(ctx, parsedCid.Hash.HashBytes(), 2, totalStages)
if err != nil {
return err
}
upload, err := s.storage.UploadObject(ctx, s5.GetStorageProtocol(s.protocol), bytes.NewReader(fileData), parsedCid.Size, nil, hash)
if err != nil {
return err
}
err = saveAndPin(upload)
if err != nil {
return err
}
return nil
}
// Fetch proof.
proof, err := fetchAndProcess(proofUrl, 1)
if err != nil {
return err
}
baoProof := bao.Result{
Hash: parsedCid.Hash.HashBytes(),
Proof: proof,
Length: uint(parsedCid.Size),
}
client, err := s.storage.S3Client(ctx)
if err != nil {
s.logger.Error("error getting s3 client", zap.Error(err))
return err
}
req, err := rq.Get(url).ParseRequest()
if err != nil {
s.logger.Error("error parsing request", zap.Error(err))
return err
}
res, err := http.DefaultClient.Do(req)
if err != nil {
s.logger.Error("error executing request", zap.Error(err))
return err
}
defer closeBody(res.Body)
verifier := bao.NewVerifier(res.Body, baoProof, s.logger)
defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {
s.logger.Error("error closing verifier stream", zap.Error(err))
}
}(verifier)
if parsedCid.Size < storage.S3_MULTIPART_MIN_PART_SIZE {
_, err = client.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(s.config.Config().Core.Storage.S3.BufferBucket),
Key: aws.String(cid),
Body: verifier,
ContentLength: aws.Int64(int64(parsedCid.Size)),
})
if err != nil {
s.logger.Error("error uploading object", zap.Error(err))
return err
}
} else {
err := s.storage.S3MultipartUpload(ctx, verifier, s.config.Config().Core.Storage.S3.BufferBucket, cid, parsedCid.Size)
if err != nil {
s.logger.Error("error uploading object", zap.Error(err))
return err
}
}
err = s._import.UpdateProgress(ctx, parsedCid.Hash.HashBytes(), 2, totalStages)
if err != nil {
return err
}
upload, err := s.storage.UploadObject(ctx, s5.GetStorageProtocol(s.protocol), nil, 0, &renter.MultiPartUploadParams{
ReaderFactory: func(start uint, end uint) (io.ReadCloser, error) {
rangeHeader := "bytes=%d-"
if end != 0 {
rangeHeader += "%d"
rangeHeader = fmt.Sprintf("bytes=%d-%d", start, end)
} else {
rangeHeader = fmt.Sprintf("bytes=%d-", start)
}
object, err := client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(s.config.Config().Core.Storage.S3.BufferBucket),
Key: aws.String(cid),
Range: aws.String(rangeHeader),
})
if err != nil {
return nil, err
}
return object.Body, nil
},
Bucket: s.config.Config().Core.Storage.S3.BufferBucket,
FileName: s5.GetStorageProtocol(s.protocol).EncodeFileName(parsedCid.Hash.HashBytes()),
Size: parsedCid.Size,
UploadIDHandler: nil,
}, &baoProof)
if err != nil {
s.logger.Error("error uploading object", zap.Error(err))
return err
}
err = saveAndPin(upload)
if err != nil {
return err
}
return nil
}
func (s *S5API) Domain() string {
return router.BuildSubdomain(s, s.config)
}
func (s *S5API) AuthTokenName() string {
return "s5-auth-token"
}
func isCidManifest(cid *encoding.CID) bool {
mTypes := []types.CIDType{
types.CIDTypeMetadataMedia,
types.CIDTypeMetadataWebapp,
types.CIDTypeUserIdentity,
types.CIDTypeDirectory,
}
return slices.Contains(mTypes, cid.Type)
}
func extractMPFilename(header textproto.MIMEHeader) string {
cd := header.Get("Content-Disposition")
if cd == "" {
return ""
}
_, params, err := mime.ParseMediaType(cd)
if err != nil {
return ""
}
filename := params["filename"]
if filename == "" {
return ""
}
return filename
}