diff --git a/protocols/s5/tus.go b/protocols/s5/tus.go index b4fda80..985fef6 100644 --- a/protocols/s5/tus.go +++ b/protocols/s5/tus.go @@ -10,6 +10,8 @@ import ( "strings" "time" + "git.lumeweb.com/LumeWeb/portal/api/middleware" + "go.uber.org/fx" "git.lumeweb.com/LumeWeb/portal/account" @@ -67,8 +69,8 @@ type TusHandlerParams struct { Metadata metadata.MetadataService } -func NewTusHandler(params TusHandlerParams) *TusHandler { - return &TusHandler{ +func NewTusHandler(lc fx.Lifecycle, params TusHandlerParams) *TusHandler { + th := &TusHandler{ config: params.Config, db: params.Db, logger: params.Logger, @@ -77,6 +79,15 @@ func NewTusHandler(params TusHandlerParams) *TusHandler { accounts: params.Accounts, metadata: params.Metadata, } + + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + go th.worker() + return nil + }, + }) + + return th } func (t *TusHandler) Init() error { @@ -420,6 +431,77 @@ func (t *TusHandler) uploadTask(ctx context.Context, upload *models.TusUpload) e return nil } + +func (t *TusHandler) worker() { + + for { + select { + case info := <-t.tus.CreatedUploads: + hash, ok := info.Upload.MetaData["hash"] + errorResponse := tusd.HTTPResponse{StatusCode: 400, Header: nil} + if !ok { + t.logger.Error("Missing hash in metadata") + continue + } + + uploaderID, ok := info.Context.Value(middleware.DEFAULT_AUTH_CONTEXT_KEY).(uint64) + if !ok { + errorResponse.Body = "Missing user id in context" + info.Upload.StopUpload(errorResponse) + t.logger.Error("Missing user id in context") + continue + } + + uploaderIP := info.HTTPRequest.RemoteAddr + + decodedHash, err := encoding.MultihashFromBase64Url(hash) + + if err != nil { + errorResponse.Body = "Could not decode hash" + info.Upload.StopUpload(errorResponse) + t.logger.Error("Could not decode hash", zap.Error(err)) + continue + } + + _, err = t.CreateUpload(info.Context, decodedHash.HashBytes(), info.Upload.ID, uint(uploaderID), uploaderIP, info.Context.Value("protocol").(string)) + if err != nil { + errorResponse.Body = "Could not create tus upload" + info.Upload.StopUpload(errorResponse) + t.logger.Error("Could not create tus upload", zap.Error(err)) + continue + } + case info := <-t.tus.UploadProgress: + err := t.UploadProgress(info.Context, info.Upload.ID) + if err != nil { + t.logger.Error("Could not update tus upload", zap.Error(err)) + continue + } + case info := <-t.tus.TerminatedUploads: + err := t.DeleteUpload(info.Context, info.Upload.ID) + if err != nil { + t.logger.Error("Could not delete tus upload", zap.Error(err)) + continue + } + + case info := <-t.tus.CompleteUploads: + if !(!info.Upload.SizeIsDeferred && info.Upload.Offset == info.Upload.Size) { + continue + } + err := t.UploadCompleted(info.Context, info.Upload.ID) + if err != nil { + t.logger.Error("Could not complete tus upload", zap.Error(err)) + continue + } + err = t.ScheduleUpload(info.Context, info.Upload.ID) + if err != nil { + t.logger.Error("Could not schedule tus upload", zap.Error(err)) + continue + } + + } + } +} + func splitS3Ids(id string) (objectId, multipartId string) { index := strings.Index(id, "+") if index == -1 {