feat: create a cron job abstraction with a RetryableTask method, RetryableTaskParams struct, CronJob struct, and CreateJob method
This commit is contained in:
parent
b4e2e962e5
commit
1af1ea9505
74
cron/cron.go
74
cron/cron.go
|
@ -2,8 +2,10 @@ package cron
|
|||
|
||||
import (
|
||||
"context"
|
||||
"github.com/google/uuid"
|
||||
"go.uber.org/fx"
|
||||
"go.uber.org/zap"
|
||||
"time"
|
||||
|
||||
"github.com/go-co-op/gocron/v2"
|
||||
)
|
||||
|
@ -36,6 +38,22 @@ type CronServiceImpl struct {
|
|||
logger *zap.Logger
|
||||
}
|
||||
|
||||
type RetryableTaskParams struct {
|
||||
Name string
|
||||
Function any
|
||||
Args []any
|
||||
Attempt uint
|
||||
Limit uint
|
||||
After func(jobID uuid.UUID, jobName string)
|
||||
Error func(jobID uuid.UUID, jobName string, err error)
|
||||
}
|
||||
|
||||
type CronJob struct {
|
||||
Job gocron.JobDefinition
|
||||
Task gocron.Task
|
||||
Options []gocron.JobOption
|
||||
}
|
||||
|
||||
func (c *CronServiceImpl) Scheduler() gocron.Scheduler {
|
||||
return c.scheduler
|
||||
}
|
||||
|
@ -71,3 +89,59 @@ func (c *CronServiceImpl) start() error {
|
|||
func (c *CronServiceImpl) RegisterService(service CronableService) {
|
||||
c.services = append(c.services, service)
|
||||
}
|
||||
|
||||
func (c *CronServiceImpl) RetryableTask(params RetryableTaskParams) CronJob {
|
||||
job := gocron.OneTimeJob(gocron.OneTimeJobStartImmediately())
|
||||
|
||||
if params.Attempt > 0 {
|
||||
job = gocron.OneTimeJob(gocron.OneTimeJobStartDateTime(time.Now().Add(time.Duration(params.Attempt) * time.Minute)))
|
||||
}
|
||||
|
||||
task := gocron.NewTask(params.Function, params.Args...)
|
||||
|
||||
afterFunc := params.After
|
||||
if afterFunc == nil {
|
||||
afterFunc = func(jobID uuid.UUID, jobName string) {}
|
||||
}
|
||||
|
||||
errorFunc := params.Error
|
||||
if errorFunc == nil {
|
||||
errorFunc = func(jobID uuid.UUID, jobName string, err error) {}
|
||||
}
|
||||
|
||||
listeners := gocron.WithEventListeners(gocron.AfterJobRunsWithError(func(jobID uuid.UUID, jobName string, err error) {
|
||||
params.Error(jobID, jobName, err)
|
||||
|
||||
if params.Attempt >= params.Limit {
|
||||
c.logger.Error("Retryable task limit reached", zap.String("jobName", jobName), zap.String("jobID", jobID.String()))
|
||||
return
|
||||
}
|
||||
|
||||
taskRetry := params
|
||||
taskRetry.Attempt++
|
||||
|
||||
retryTask := c.RetryableTask(taskRetry)
|
||||
|
||||
_, err = c.CreateJob(retryTask)
|
||||
if err != nil {
|
||||
c.logger.Error("Failed to create retry job", zap.Error(err))
|
||||
}
|
||||
}), gocron.AfterJobRuns(params.After))
|
||||
|
||||
name := gocron.WithName(params.Name)
|
||||
|
||||
return CronJob{
|
||||
Job: job,
|
||||
Task: task,
|
||||
Options: []gocron.JobOption{listeners, name},
|
||||
}
|
||||
}
|
||||
|
||||
func (c *CronServiceImpl) CreateJob(job CronJob) (gocron.Job, error) {
|
||||
ret, err := c.Scheduler().NewJob(job.Job, job.Task, job.Options...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return ret, nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue