diff --git a/cron/cron.go b/cron/cron.go index 3847f16..8c9d403 100644 --- a/cron/cron.go +++ b/cron/cron.go @@ -2,8 +2,10 @@ package cron import ( "context" + "github.com/google/uuid" "go.uber.org/fx" "go.uber.org/zap" + "time" "github.com/go-co-op/gocron/v2" ) @@ -36,6 +38,22 @@ type CronServiceImpl struct { logger *zap.Logger } +type RetryableTaskParams struct { + Name string + Function any + Args []any + Attempt uint + Limit uint + After func(jobID uuid.UUID, jobName string) + Error func(jobID uuid.UUID, jobName string, err error) +} + +type CronJob struct { + Job gocron.JobDefinition + Task gocron.Task + Options []gocron.JobOption +} + func (c *CronServiceImpl) Scheduler() gocron.Scheduler { return c.scheduler } @@ -71,3 +89,59 @@ func (c *CronServiceImpl) start() error { func (c *CronServiceImpl) RegisterService(service CronableService) { c.services = append(c.services, service) } + +func (c *CronServiceImpl) RetryableTask(params RetryableTaskParams) CronJob { + job := gocron.OneTimeJob(gocron.OneTimeJobStartImmediately()) + + if params.Attempt > 0 { + job = gocron.OneTimeJob(gocron.OneTimeJobStartDateTime(time.Now().Add(time.Duration(params.Attempt) * time.Minute))) + } + + task := gocron.NewTask(params.Function, params.Args...) + + afterFunc := params.After + if afterFunc == nil { + afterFunc = func(jobID uuid.UUID, jobName string) {} + } + + errorFunc := params.Error + if errorFunc == nil { + errorFunc = func(jobID uuid.UUID, jobName string, err error) {} + } + + listeners := gocron.WithEventListeners(gocron.AfterJobRunsWithError(func(jobID uuid.UUID, jobName string, err error) { + params.Error(jobID, jobName, err) + + if params.Attempt >= params.Limit { + c.logger.Error("Retryable task limit reached", zap.String("jobName", jobName), zap.String("jobID", jobID.String())) + return + } + + taskRetry := params + taskRetry.Attempt++ + + retryTask := c.RetryableTask(taskRetry) + + _, err = c.CreateJob(retryTask) + if err != nil { + c.logger.Error("Failed to create retry job", zap.Error(err)) + } + }), gocron.AfterJobRuns(params.After)) + + name := gocron.WithName(params.Name) + + return CronJob{ + Job: job, + Task: task, + Options: []gocron.JobOption{listeners, name}, + } +} + +func (c *CronServiceImpl) CreateJob(job CronJob) (gocron.Job, error) { + ret, err := c.Scheduler().NewJob(job.Job, job.Task, job.Options...) + if err != nil { + return nil, err + } + + return ret, nil +}