From 866739007cc745c18068eb7a8d67ac22fcc60682 Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Fri, 1 Mar 2024 04:27:24 -0500 Subject: [PATCH] refactor: use golang-queue to parallel process pinning checks --- api/s5/s5.go | 66 ++++++++++++++++++++++++++++++++++++++++------------ go.mod | 2 ++ go.sum | 38 +++++------------------------- 3 files changed, 59 insertions(+), 47 deletions(-) diff --git a/api/s5/s5.go b/api/s5/s5.go index 298960c..a5f5c79 100644 --- a/api/s5/s5.go +++ b/api/s5/s5.go @@ -60,6 +60,7 @@ import ( "git.lumeweb.com/LumeWeb/portal/protocols/s5" "github.com/ddo/rq" dnslink "github.com/dnslink-std/go" + "github.com/golang-queue/queue" "github.com/rs/cors" "go.sia.tech/jape" "go.uber.org/fx" @@ -860,34 +861,69 @@ func (s *S5API) accountPinManifest(jc jape.Context, userId uint, cid *encoding.C error error } + type pinQueueResult struct { + success bool + error error + cid *encoding.CID + } + cids, err := s.getManifestCids(cid) if err != nil { s.sendErrorResponse(jc, NewS5Error(ErrKeyInvalidOperation, err)) return } + q := queue.NewPool(10) + defer q.Release() + rets := make(chan pinQueueResult, len(cids)) + results := make(map[string]pinResult, len(cids)) - lo.ForEach(cids, func(c *encoding.CID, _i int) { - ret := pinResult{ - success: true, - error: nil, - } - err := s.pinEntity(jc.Request.Context(), userId, c) - if err != nil { - s.logger.Error("Error pinning entity", zap.Error(err)) - ret.success = false - ret.error = err - } + for i := 0; i < len(cids); i++ { + go func(cid *encoding.CID) { + if err := q.QueueTask(func(ctx context.Context) error { + ret := pinQueueResult{ + success: true, + error: nil, + cid: cid, + } + err := s.pinEntity(ctx, userId, cid) + if err != nil { + s.logger.Error("Error pinning entity", zap.Error(err)) + ret.success = false + ret.error = err + } - b64, err := c.ToBase64Url() + rets <- ret + return nil + }); err != nil { + s.logger.Error("Error queueing task", zap.Error(err)) + rets <- pinQueueResult{ + success: false, + error: err, + cid: cid, + } + } + }(cids[i]) + } + + go func() { + q.Wait() + close(rets) + }() + + for ret := range rets { + b64, err := ret.cid.ToBase64Url() if err != nil { s.logger.Error("Error encoding CID to base64", zap.Error(err)) - return + continue } - results[b64] = ret - }) + results[b64] = pinResult{ + success: ret.success, + error: ret.error, + } + } jc.Encode(&results) } diff --git a/go.mod b/go.mod index 4e23616..f601985 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/go-gorm/caches/v4 v4.0.0 github.com/go-resty/resty/v2 v2.11.0 github.com/golang-jwt/jwt/v5 v5.2.0 + github.com/golang-queue/queue v0.2.0 github.com/google/uuid v1.6.0 github.com/hashicorp/go-plugin v1.6.0 github.com/julienschmidt/httprouter v1.3.0 @@ -85,6 +86,7 @@ require ( github.com/jinzhu/now v1.1.5 // indirect github.com/jonboulle/clockwork v0.4.0 // indirect github.com/josharian/intern v1.0.0 // indirect + github.com/jpillora/backoff v1.0.0 // indirect github.com/klauspost/cpuid/v2 v2.2.6 // indirect github.com/klauspost/reedsolomon v1.12.1 // indirect github.com/magiconair/properties v1.8.7 // indirect diff --git a/go.sum b/go.sum index 935254d..abb42f0 100644 --- a/go.sum +++ b/go.sum @@ -1,36 +1,4 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= -git.lumeweb.com/LumeWeb/libs5-go v0.0.0-20240301062420-429565562d76 h1:k0oxtlOCbyY4IYuYDfFC0Qkz64Vho0R/c1h2nGgBV8A= -git.lumeweb.com/LumeWeb/libs5-go v0.0.0-20240301062420-429565562d76/go.mod h1:h/1HUjA2mWT14QsRprG6o7WDeP3paAB23tkh/OczmUk= -git.lumeweb.com/LumeWeb/libs5-go v0.0.0-20240301063122-5b7d7866624f h1:uZzRL+rFDO/YG3NLGrQko+If4z9+u6zlVecWLPbvpts= -git.lumeweb.com/LumeWeb/libs5-go v0.0.0-20240301063122-5b7d7866624f/go.mod h1:h/1HUjA2mWT14QsRprG6o7WDeP3paAB23tkh/OczmUk= -git.lumeweb.com/LumeWeb/libs5-go v0.0.0-20240301063618-578cdba32e44 h1:n02DmvGymKngIuDCKqkADETCH6JKKi54feat2M3jrcY= -git.lumeweb.com/LumeWeb/libs5-go v0.0.0-20240301063618-578cdba32e44/go.mod h1:h/1HUjA2mWT14QsRprG6o7WDeP3paAB23tkh/OczmUk= -git.lumeweb.com/LumeWeb/libs5-go v0.0.0-20240301064115-258031cb8f4d h1:4ToG216vYeZc5ZHHaxwg76ahoKKmEbnyiHZ5E2KrCv0= -git.lumeweb.com/LumeWeb/libs5-go v0.0.0-20240301064115-258031cb8f4d/go.mod h1:h/1HUjA2mWT14QsRprG6o7WDeP3paAB23tkh/OczmUk= -git.lumeweb.com/LumeWeb/libs5-go v0.0.0-20240301064900-280e5b1d71e8 h1:VltmTDXrk/5iKJaXIKDjRnB6wrYV7PgPDbdz1hwJzCs= -git.lumeweb.com/LumeWeb/libs5-go v0.0.0-20240301064900-280e5b1d71e8/go.mod h1:h/1HUjA2mWT14QsRprG6o7WDeP3paAB23tkh/OczmUk= -git.lumeweb.com/LumeWeb/libs5-go v0.0.0-20240301065343-6689c95eea95 h1:+t4+YMzB1XLhvLA/48vL8LzhoXt4nfrhaXtWr0CK+BQ= -git.lumeweb.com/LumeWeb/libs5-go v0.0.0-20240301065343-6689c95eea95/go.mod h1:h/1HUjA2mWT14QsRprG6o7WDeP3paAB23tkh/OczmUk= -git.lumeweb.com/LumeWeb/libs5-go v0.0.0-20240301071935-efb11a9c5007 h1:fsHVh72zIILEpRznjo56Bkvz2LWHRrM6FNJqOM5J7wA= -git.lumeweb.com/LumeWeb/libs5-go v0.0.0-20240301071935-efb11a9c5007/go.mod h1:h/1HUjA2mWT14QsRprG6o7WDeP3paAB23tkh/OczmUk= -git.lumeweb.com/LumeWeb/libs5-go v0.0.0-20240301072922-7c6d11258f43 h1:+qHJtFW7cY8r98mGW7QSCNUrrws1xksIooa0QAvXHug= -git.lumeweb.com/LumeWeb/libs5-go v0.0.0-20240301072922-7c6d11258f43/go.mod h1:h/1HUjA2mWT14QsRprG6o7WDeP3paAB23tkh/OczmUk= -git.lumeweb.com/LumeWeb/libs5-go v0.0.0-20240301073611-f350a37e5864 h1:tPpTcjDVsYF0c2CtKpfsrDFsp5StL0AzvtLcsDS6rsY= -git.lumeweb.com/LumeWeb/libs5-go v0.0.0-20240301073611-f350a37e5864/go.mod h1:h/1HUjA2mWT14QsRprG6o7WDeP3paAB23tkh/OczmUk= -git.lumeweb.com/LumeWeb/libs5-go v0.0.0-20240301074610-28ff1eed4862 h1:cCfrkc74BM/2Jm/5Ap0kBpsyUPnJYrjQFvCwL+e9CmY= -git.lumeweb.com/LumeWeb/libs5-go v0.0.0-20240301074610-28ff1eed4862/go.mod h1:h/1HUjA2mWT14QsRprG6o7WDeP3paAB23tkh/OczmUk= -git.lumeweb.com/LumeWeb/libs5-go v0.0.0-20240301074950-9485c023e7ec h1:UaMrpJV2TdJtGd8x6FsfIxubOeT+xHdXiarz1npdxW0= -git.lumeweb.com/LumeWeb/libs5-go v0.0.0-20240301074950-9485c023e7ec/go.mod h1:h/1HUjA2mWT14QsRprG6o7WDeP3paAB23tkh/OczmUk= -git.lumeweb.com/LumeWeb/libs5-go v0.0.0-20240301080130-ab37004d16fa h1:vr28MjX+I50QLT6riMkMFyISUoIchFI+LcnBNK7Ck1Y= -git.lumeweb.com/LumeWeb/libs5-go v0.0.0-20240301080130-ab37004d16fa/go.mod h1:h/1HUjA2mWT14QsRprG6o7WDeP3paAB23tkh/OczmUk= -git.lumeweb.com/LumeWeb/libs5-go v0.0.0-20240301080511-52c5af78a962 h1:b3u73DD2+aWEr1PwEbBRD9ODfCrkjjahvTpO1qd3mCI= -git.lumeweb.com/LumeWeb/libs5-go v0.0.0-20240301080511-52c5af78a962/go.mod h1:h/1HUjA2mWT14QsRprG6o7WDeP3paAB23tkh/OczmUk= -git.lumeweb.com/LumeWeb/libs5-go v0.0.0-20240301082131-05522522bf0c h1:16p+kfbPVCFlwNRNnVBTn0pqFup7/UotCYFsLRSIVq0= -git.lumeweb.com/LumeWeb/libs5-go v0.0.0-20240301082131-05522522bf0c/go.mod h1:h/1HUjA2mWT14QsRprG6o7WDeP3paAB23tkh/OczmUk= -git.lumeweb.com/LumeWeb/libs5-go v0.0.0-20240301082520-c00fe563898a h1:e9HUeq6z+gBdxQ20SAb+RAftmxyHm8iGe7x8Ani96TU= -git.lumeweb.com/LumeWeb/libs5-go v0.0.0-20240301082520-c00fe563898a/go.mod h1:h/1HUjA2mWT14QsRprG6o7WDeP3paAB23tkh/OczmUk= -git.lumeweb.com/LumeWeb/libs5-go v0.0.0-20240301083026-4b6f71ea1a1c h1:wf9ea9HovQC1K/N758WvdbkIGPZ2PMAuwYhdhwu5jSY= -git.lumeweb.com/LumeWeb/libs5-go v0.0.0-20240301083026-4b6f71ea1a1c/go.mod h1:h/1HUjA2mWT14QsRprG6o7WDeP3paAB23tkh/OczmUk= git.lumeweb.com/LumeWeb/libs5-go v0.0.0-20240301084105-1c8efbfba8e4 h1:jr+kDSgITmLeTf7rfcp9EWToKaehY52+VruTwA2uzg8= git.lumeweb.com/LumeWeb/libs5-go v0.0.0-20240301084105-1c8efbfba8e4/go.mod h1:h/1HUjA2mWT14QsRprG6o7WDeP3paAB23tkh/OczmUk= github.com/Acconut/go-httptest-recorder v1.0.0 h1:TAv2dfnqp/l+SUvIaMAUK4GeN4+wqb6KZsFFFTGhoJg= @@ -49,6 +17,8 @@ github.com/aead/chacha20 v0.0.0-20180709150244-8b13a72661da h1:KjTM2ks9d14ZYCvmH github.com/aead/chacha20 v0.0.0-20180709150244-8b13a72661da/go.mod h1:eHEWzANqSiWQsof+nXEI9bUVUyV6F53Fp89EuCh2EAA= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/appleboy/com v0.1.7 h1:4lYTFNoMAAXGGIC8lDxVg/NY+1aXbYqfAWN05cZhd0M= +github.com/appleboy/com v0.1.7/go.mod h1:JUK+oH0SXCLRH57pDMJx6VWVsm8CPdajalmRSWwamBE= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/aws/aws-sdk-go v1.49.1 h1:Dsamcd8d/nNb3A+bZ0ucfGl0vGZsW5wlRW0vhoYGoeQ= github.com/aws/aws-sdk-go v1.49.1/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk= @@ -175,6 +145,8 @@ github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/golang-jwt/jwt/v5 v5.2.0 h1:d/ix8ftRUorsN+5eMIlF4T6J8CAt9rch3My2winC1Jw= github.com/golang-jwt/jwt/v5 v5.2.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= +github.com/golang-queue/queue v0.2.0 h1:R0INU16rLCzYmc5h9wqHI/6owNxqcRVVMd5gyKVmnfU= +github.com/golang-queue/queue v0.2.0/go.mod h1:5nEkJTzw9Boc8ZCylQlrJK5f/Vd8Uo58yAssRli5ckg= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= @@ -233,6 +205,8 @@ github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST github.com/jonboulle/clockwork v0.4.0/go.mod h1:xgRqUGwRcjKCO1vbZUEtSLrqKoPSsUpK7fnezOII0kc= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= +github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=