diff --git a/go.mod b/go.mod index c9107b7..c90d1db 100644 --- a/go.mod +++ b/go.mod @@ -141,6 +141,7 @@ require ( go.sia.tech/mux v1.2.0 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect + golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 // indirect golang.org/x/net v0.9.0 // indirect golang.org/x/sys v0.8.0 // indirect golang.org/x/text v0.9.0 // indirect diff --git a/go.sum b/go.sum index bbc4382..b7dafd0 100644 --- a/go.sum +++ b/go.sum @@ -1275,6 +1275,8 @@ golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= golang.org/x/exp v0.0.0-20220827204233-334a2380cb91/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE= +golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 h1:k/i9J1pBpvlfR+9QsetwPyERsqu1GIbi967PQMq3Ivc= +golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= diff --git a/tus/tus.go b/tus/tus.go index 9f8bfaa..d800308 100644 --- a/tus/tus.go +++ b/tus/tus.go @@ -16,8 +16,8 @@ import ( tusd "github.com/tus/tusd/pkg/handler" "github.com/tus/tusd/pkg/memorylocker" "go.uber.org/zap" + "golang.org/x/exp/slices" "io" - "log" ) const TUS_API_PATH = "/files/tus" @@ -105,6 +105,8 @@ func tusStartup() { defer rows.Close() + processedHashes := make([]string, 0) + for rows.Next() { var tusUpload model.Tus err := db.Get().ScanRows(rows, &tusUpload) @@ -112,15 +114,28 @@ func tusStartup() { logger.Get().Error("failed to scan tus records", zap.Error(err)) return } - if err := tusQueue.QueueTask(func(ctx context.Context) error { - upload, err := store.GetUpload(nil, tusUpload.UploadID) + + upload, err := store.GetUpload(nil, tusUpload.UploadID) + if err != nil { + logger.Get().Error("failed to query tus upload", zap.Error(err)) + db.Get().Delete(&tusUpload) + continue + } + + if slices.Contains(processedHashes, tusUpload.Hash) { + err := terminateUpload(upload) if err != nil { - logger.Get().Error("failed to query tus upload", zap.Error(err)) - return err + logger.Get().Error("failed to terminate tus upload", zap.Error(err)) } + continue + } + + if err := tusQueue.QueueTask(func(ctx context.Context) error { return tusWorker(&upload) }); err != nil { - log.Print(err) + logger.Get().Error("failed to queue tus upload", zap.Error(err)) + } else { + processedHashes = append(processedHashes, tusUpload.Hash) } } }