fix: add missing tus background worker

This commit is contained in:
Derrick Hammer 2024-02-17 23:55:25 -05:00
parent 445084ca5b
commit 2ce26239da
Signed by: pcfreak30
GPG Key ID: C997C339BE476FF2
1 changed files with 84 additions and 2 deletions

View File

@ -10,6 +10,8 @@ import (
"strings" "strings"
"time" "time"
"git.lumeweb.com/LumeWeb/portal/api/middleware"
"go.uber.org/fx" "go.uber.org/fx"
"git.lumeweb.com/LumeWeb/portal/account" "git.lumeweb.com/LumeWeb/portal/account"
@ -67,8 +69,8 @@ type TusHandlerParams struct {
Metadata metadata.MetadataService Metadata metadata.MetadataService
} }
func NewTusHandler(params TusHandlerParams) *TusHandler { func NewTusHandler(lc fx.Lifecycle, params TusHandlerParams) *TusHandler {
return &TusHandler{ th := &TusHandler{
config: params.Config, config: params.Config,
db: params.Db, db: params.Db,
logger: params.Logger, logger: params.Logger,
@ -77,6 +79,15 @@ func NewTusHandler(params TusHandlerParams) *TusHandler {
accounts: params.Accounts, accounts: params.Accounts,
metadata: params.Metadata, metadata: params.Metadata,
} }
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
go th.worker()
return nil
},
})
return th
} }
func (t *TusHandler) Init() error { func (t *TusHandler) Init() error {
@ -420,6 +431,77 @@ func (t *TusHandler) uploadTask(ctx context.Context, upload *models.TusUpload) e
return nil return nil
} }
func (t *TusHandler) worker() {
for {
select {
case info := <-t.tus.CreatedUploads:
hash, ok := info.Upload.MetaData["hash"]
errorResponse := tusd.HTTPResponse{StatusCode: 400, Header: nil}
if !ok {
t.logger.Error("Missing hash in metadata")
continue
}
uploaderID, ok := info.Context.Value(middleware.DEFAULT_AUTH_CONTEXT_KEY).(uint64)
if !ok {
errorResponse.Body = "Missing user id in context"
info.Upload.StopUpload(errorResponse)
t.logger.Error("Missing user id in context")
continue
}
uploaderIP := info.HTTPRequest.RemoteAddr
decodedHash, err := encoding.MultihashFromBase64Url(hash)
if err != nil {
errorResponse.Body = "Could not decode hash"
info.Upload.StopUpload(errorResponse)
t.logger.Error("Could not decode hash", zap.Error(err))
continue
}
_, err = t.CreateUpload(info.Context, decodedHash.HashBytes(), info.Upload.ID, uint(uploaderID), uploaderIP, info.Context.Value("protocol").(string))
if err != nil {
errorResponse.Body = "Could not create tus upload"
info.Upload.StopUpload(errorResponse)
t.logger.Error("Could not create tus upload", zap.Error(err))
continue
}
case info := <-t.tus.UploadProgress:
err := t.UploadProgress(info.Context, info.Upload.ID)
if err != nil {
t.logger.Error("Could not update tus upload", zap.Error(err))
continue
}
case info := <-t.tus.TerminatedUploads:
err := t.DeleteUpload(info.Context, info.Upload.ID)
if err != nil {
t.logger.Error("Could not delete tus upload", zap.Error(err))
continue
}
case info := <-t.tus.CompleteUploads:
if !(!info.Upload.SizeIsDeferred && info.Upload.Offset == info.Upload.Size) {
continue
}
err := t.UploadCompleted(info.Context, info.Upload.ID)
if err != nil {
t.logger.Error("Could not complete tus upload", zap.Error(err))
continue
}
err = t.ScheduleUpload(info.Context, info.Upload.ID)
if err != nil {
t.logger.Error("Could not schedule tus upload", zap.Error(err))
continue
}
}
}
}
func splitS3Ids(id string) (objectId, multipartId string) { func splitS3Ids(id string) (objectId, multipartId string) {
index := strings.Index(id, "+") index := strings.Index(id, "+")
if index == -1 { if index == -1 {