diff --git a/cli/server/server.go b/cli/server/server.go index 7f95e8f0d..779a5b5bb 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -73,6 +73,10 @@ func (c *command) run(*kingpin.ParseContext) error { // - ctx is canceled g, gCtx := errgroup.WithContext(ctx) + g.Go(func() error { + return system.services.JobScheduler.Run(gCtx) + }) + // start server gHTTP, shutdownHTTP := system.server.ListenAndServe() g.Go(gHTTP.Wait) @@ -116,6 +120,8 @@ func (c *command) run(*kingpin.ParseContext) error { } } + system.services.JobScheduler.WaitJobsDone(shutdownCtx) + log.Info().Msg("wait for subroutines to complete") err = g.Wait() diff --git a/cmd/gitness/wire.go b/cmd/gitness/wire.go index 8d4b0db68..54261520e 100644 --- a/cmd/gitness/wire.go +++ b/cmd/gitness/wire.go @@ -44,6 +44,7 @@ import ( "github.com/harness/gitness/internal/server" "github.com/harness/gitness/internal/services" "github.com/harness/gitness/internal/services/codecomments" + "github.com/harness/gitness/internal/services/job" pullreqservice "github.com/harness/gitness/internal/services/pullreq" "github.com/harness/gitness/internal/services/webhook" "github.com/harness/gitness/internal/store" @@ -102,6 +103,7 @@ func initSystem(ctx context.Context, config *types.Config) (*cliserver.System, e lock.WireSet, pubsub.WireSet, codecomments.WireSet, + job.WireSet, gitrpccron.WireSet, checkcontroller.WireSet, execution.WireSet, diff --git a/cmd/gitness/wire_gen.go b/cmd/gitness/wire_gen.go index 2ae30f97e..5ab076ada 100644 --- a/cmd/gitness/wire_gen.go +++ b/cmd/gitness/wire_gen.go @@ -42,6 +42,7 @@ import ( server2 "github.com/harness/gitness/internal/server" "github.com/harness/gitness/internal/services" "github.com/harness/gitness/internal/services/codecomments" + "github.com/harness/gitness/internal/services/job" pullreq2 "github.com/harness/gitness/internal/services/pullreq" "github.com/harness/gitness/internal/services/webhook" "github.com/harness/gitness/internal/store" @@ -202,7 +203,13 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro if err != nil { return nil, err } - servicesServices := services.ProvideServices(webhookService, pullreqService) + jobStore := database.ProvideJobStore(db) + executor := job.ProvideExecutor(jobStore, pubSub) + scheduler, err := job.ProvideScheduler(jobStore, executor, mutexManager, pubSub, config) + if err != nil { + return nil, err + } + servicesServices := services.ProvideServices(webhookService, pullreqService, executor, scheduler) serverSystem := server.NewSystem(bootstrapBootstrap, serverServer, grpcServer, manager, servicesServices) return serverSystem, nil } diff --git a/go.mod b/go.mod index bbb1b79a8..f05657001 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/golang/mock v1.6.0 github.com/google/go-cmp v0.5.9 github.com/google/wire v0.5.0 + github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75 github.com/gotidy/ptr v1.3.0 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/guregu/null v4.0.0+incompatible diff --git a/go.sum b/go.sum index b2c275ac0..422091da2 100644 --- a/go.sum +++ b/go.sum @@ -254,6 +254,8 @@ github.com/googleapis/enterprise-certificate-proxy v0.2.3/go.mod h1:AwSRAtLfXpU5 github.com/googleapis/gax-go/v2 v2.7.0 h1:IcsPKeInNvYi7eqSaDjiZqDDKu5rsmunY0Y1YupQSSQ= github.com/googleapis/gax-go/v2 v2.7.0/go.mod h1:TEop28CZZQ2y+c0VxMUmu1lV+fQx57QpBWsYpwqHJx8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= +github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75 h1:f0n1xnMSmBLzVfsMMvriDyA75NB/oBgILX2GcHXIQzY= +github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75/go.mod h1:g2644b03hfBX9Ov0ZBDgXXens4rxSxmqFBbhvKv2yVA= github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= @@ -367,6 +369,7 @@ github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/jmoiron/sqlx v1.3.3 h1:j82X0bf7oQ27XeqxicSZsTU5suPwKElg3oyxNn43iTk= github.com/jmoiron/sqlx v1.3.3/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ= diff --git a/internal/services/job/executor.go b/internal/services/job/executor.go new file mode 100644 index 000000000..7f80917d3 --- /dev/null +++ b/internal/services/job/executor.go @@ -0,0 +1,142 @@ +// Copyright 2022 Harness Inc. All rights reserved. +// Use of this source code is governed by the Polyform Free Trial License +// that can be found in the LICENSE.md file for this repository. + +package job + +import ( + "context" + "errors" + "fmt" + "runtime/debug" + "time" + + "github.com/harness/gitness/internal/store" + "github.com/harness/gitness/pubsub" + "github.com/harness/gitness/types" + "github.com/harness/gitness/types/enum" + + "github.com/rs/zerolog/log" +) + +// Executor holds map of Handler objects per each job type registered. +// The Scheduler uses the Executor to start execution of jobs. +type Executor struct { + handlerMap map[string]Handler + handlerComplete bool + store store.JobStore + publisher pubsub.Publisher +} + +const ( + ProgressMin = 0 + ProgressMax = 100 +) + +// ProgressReporter can be used by a job Handler to report back the execution progress. +type ProgressReporter func(progress int, result string) error + +// Handler is a job executor for a specific job type. +// An implementation should try to honor the context and +// try to abort the execution as soon as the context is done. +type Handler interface { + Handle(ctx context.Context, input string, fn ProgressReporter) (result string, err error) +} + +var noHandlerDefinedError = errors.New("no handler registered for the job type") + +// NewExecutor creates new Executor. +func NewExecutor(jobStore store.JobStore, publisher pubsub.Publisher) *Executor { + return &Executor{ + handlerMap: make(map[string]Handler), + handlerComplete: false, + store: jobStore, + publisher: publisher, + } +} + +// Register registers a job Handler for the provided job type. +// This function is not thread safe. All calls are expected to be made +// in a single thread during the application boot time. +func (e *Executor) Register(jobType string, exec Handler) error { + if jobType == "" { + return errors.New("jobType must not be empty") + } + + if e.handlerComplete { + return errors.New("job handler registration is complete") + } + + if exec == nil { + return errors.New("provided Handler is nil") + } + + if _, ok := e.handlerMap[jobType]; ok { + return fmt.Errorf("a Handler is already defined to run the '%s' job types", jobType) + } + + e.handlerMap[jobType] = exec + + return nil +} + +// finishRegistration forbids further registration of job types. +// It is called by the Scheduler when it starts. +func (e *Executor) finishRegistration() { + e.handlerComplete = true +} + +// exec runs a single job. This function is synchronous, +// so the caller is responsible to run it in a separate go-routine. +func (e *Executor) exec( + ctx context.Context, + jobUID, jobType string, + input string, +) (result string, err error) { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf( + "panic while processing job=%s type=%s: %v\n%s", + jobUID, jobType, r, debug.Stack()) + } + }() + + exec, ok := e.handlerMap[jobType] + if !ok { + return "", noHandlerDefinedError + } + + // progressReporter is the function with which the job can update its progress. + // This function will be executed in the job executor's Go-routine. + // It uses the job's context. + progressReporter := func(progress int, result string) error { + if progress < ProgressMin || progress > ProgressMax { + return errors.New("progress must be between 0 and 100") + } + + jobDummy := &types.Job{ + UID: jobUID, + Updated: time.Now().UnixMilli(), + Result: result, + State: enum.JobStateRunning, + RunProgress: progress, + } + + // This doesn't need to be behind the global lock because it only updates the single row. + // While a job is running no other process should touch it. + // Even this call will fail if the context deadline has been exceeded. + // The job parameter is a dummy types.Job object that just holds fields that should be updated. + if err := e.store.UpdateProgress(ctx, jobDummy); err != nil { + return err + } + + // tell everybody that a job progress has been updated + if err := publishStateChange(ctx, e.publisher, jobDummy); err != nil { + log.Err(err).Msg("failed to publish job state change") + } + + return nil + } + + return exec.Handle(ctx, input, progressReporter) // runs the job +} diff --git a/internal/services/job/job_overdue.go b/internal/services/job/job_overdue.go new file mode 100644 index 000000000..dbd45a566 --- /dev/null +++ b/internal/services/job/job_overdue.go @@ -0,0 +1,89 @@ +// Copyright 2022 Harness Inc. All rights reserved. +// Use of this source code is governed by the Polyform Free Trial License +// that can be found in the LICENSE.md file for this repository. + +package job + +import ( + "context" + "fmt" + "time" + + "github.com/harness/gitness/internal/store" + "github.com/harness/gitness/lock" + "github.com/harness/gitness/types/enum" + + "github.com/rs/zerolog/log" +) + +const ( + jobUIDOverdue = "gitness:jobs:overdue" + jobTypeOverdue = "gitness:jobs:overdue" + jobCronOverdue = "*/20 * * * *" // every 20 min +) + +type jobOverdue struct { + store store.JobStore + mxManager lock.MutexManager + scheduler *Scheduler +} + +func newJobOverdue(jobStore store.JobStore, mxManager lock.MutexManager, scheduler *Scheduler) *jobOverdue { + return &jobOverdue{ + store: jobStore, + mxManager: mxManager, + scheduler: scheduler, + } +} + +// Handle reclaims overdue jobs. Normally this shouldn't happen. +// But, it can occur if DB update after a job execution fails, +// or the server suddenly terminates while the job is still running. +func (j *jobOverdue) Handle(ctx context.Context, _ string, _ ProgressReporter) (string, error) { + mx, err := globalLock(ctx, j.mxManager) + if err != nil { + return "", fmt.Errorf("failed to obtain the lock to reclaim overdue jobs") + } + + defer func() { + if err := mx.Unlock(ctx); err != nil { + log.Err(err).Msg("failed to release global lock after reclaiming overdue jobs") + } + }() + + overdueJobs, err := j.store.ListDeadlineExceeded(ctx, time.Now()) + if err != nil { + return "", fmt.Errorf("failed to list overdue jobs") + } + + if len(overdueJobs) == 0 { + return "", nil + } + + var minScheduled time.Time + + for _, job := range overdueJobs { + const errorMessage = "deadline exceeded" + postExec(job, "", errorMessage) + + err = j.store.UpdateExecution(ctx, job) + if err != nil { + return "", fmt.Errorf("failed update overdue job") + } + + if job.State == enum.JobStateScheduled { + scheduled := time.UnixMilli(job.Scheduled) + if minScheduled.IsZero() || minScheduled.After(scheduled) { + minScheduled = scheduled + } + } + } + + if !minScheduled.IsZero() { + j.scheduler.scheduleProcessing(minScheduled) + } + + result := fmt.Sprintf("found %d overdue jobs", len(overdueJobs)) + + return result, nil +} diff --git a/internal/services/job/job_purge.go b/internal/services/job/job_purge.go new file mode 100644 index 000000000..0fe757a4a --- /dev/null +++ b/internal/services/job/job_purge.go @@ -0,0 +1,67 @@ +// Copyright 2022 Harness Inc. All rights reserved. +// Use of this source code is governed by the Polyform Free Trial License +// that can be found in the LICENSE.md file for this repository. + +package job + +import ( + "context" + "fmt" + "time" + + "github.com/harness/gitness/internal/store" + "github.com/harness/gitness/lock" + + "github.com/rs/zerolog/log" +) + +const ( + jobUIDPurge = "gitness:jobs:purge" + jobTypePurge = "gitness:jobs:purge" + jobCronPurge = "15 */4 * * *" // every 4 hours at 15 minutes +) + +type jobPurge struct { + store store.JobStore + mxManager lock.MutexManager + minOldAge time.Duration +} + +func newJobPurge(jobStore store.JobStore, mxManager lock.MutexManager, minOldAge time.Duration) *jobPurge { + if minOldAge < 0 { + minOldAge = 0 + } + + return &jobPurge{ + store: jobStore, + mxManager: mxManager, + minOldAge: minOldAge, + } +} + +func (j *jobPurge) Handle(ctx context.Context, _ string, _ ProgressReporter) (string, error) { + mx, err := globalLock(ctx, j.mxManager) + if err != nil { + return "", fmt.Errorf("failed to obtain the lock to clean up old jobs") + } + + defer func() { + if err := mx.Unlock(ctx); err != nil { + log.Err(err).Msg("failed to release global lock after cleaning up old jobs") + } + }() + + olderThan := time.Now().Add(-j.minOldAge) + + n, err := j.store.DeleteOld(ctx, olderThan) + if err != nil { + return "", fmt.Errorf("failed to purge old jobs") + } + + result := "no old jobs found" + if n > 0 { + result = fmt.Sprintf("deleted %d old jobs", n) + } + + return result, nil +} diff --git a/internal/services/job/lock.go b/internal/services/job/lock.go new file mode 100644 index 000000000..81210e1fa --- /dev/null +++ b/internal/services/job/lock.go @@ -0,0 +1,23 @@ +// Copyright 2022 Harness Inc. All rights reserved. +// Use of this source code is governed by the Polyform Free Trial License +// that can be found in the LICENSE.md file for this repository. + +package job + +import ( + "context" + + "github.com/harness/gitness/lock" +) + +func globalLock(ctx context.Context, manager lock.MutexManager) (lock.Mutex, error) { + const lockKey = "jobs" + mx, err := manager.NewMutex(lockKey) + if err != nil { + return nil, err + } + + err = mx.Lock(ctx) + + return mx, err +} diff --git a/internal/services/job/pubsub.go b/internal/services/job/pubsub.go new file mode 100644 index 000000000..8f13b44da --- /dev/null +++ b/internal/services/job/pubsub.go @@ -0,0 +1,60 @@ +// Copyright 2022 Harness Inc. All rights reserved. +// Use of this source code is governed by the Polyform Free Trial License +// that can be found in the LICENSE.md file for this repository. + +package job + +import ( + "bytes" + "context" + "encoding/gob" + "fmt" + + "github.com/harness/gitness/pubsub" + "github.com/harness/gitness/types" +) + +const ( + PubSubTopicCancelJob = "gitness:job:cancel_job" + PubSubTopicStateChange = "gitness:job:state_change" +) + +func encodeStateChange(job *types.Job) ([]byte, error) { + stateChange := &types.JobStateChange{ + UID: job.UID, + State: job.State, + Progress: job.RunProgress, + Result: job.Result, + Failure: job.LastFailureError, + } + + buffer := bytes.NewBuffer(nil) + if err := gob.NewEncoder(buffer).Encode(stateChange); err != nil { + return nil, err + } + + return buffer.Bytes(), nil +} + +func DecodeStateChange(payload []byte) (*types.JobStateChange, error) { + stateChange := &types.JobStateChange{} + if err := gob.NewDecoder(bytes.NewReader(payload)).Decode(stateChange); err != nil { + return nil, err + } + + return stateChange, nil +} + +func publishStateChange(ctx context.Context, publisher pubsub.Publisher, job *types.Job) error { + payload, err := encodeStateChange(job) + if err != nil { + return fmt.Errorf("failed to gob encode JobStateChange: %w", err) + } + + err = publisher.Publish(ctx, PubSubTopicStateChange, payload) + if err != nil { + return fmt.Errorf("failed to publish JobStateChange: %w", err) + } + + return nil +} diff --git a/internal/services/job/scheduler.go b/internal/services/job/scheduler.go new file mode 100644 index 000000000..4c251b18e --- /dev/null +++ b/internal/services/job/scheduler.go @@ -0,0 +1,688 @@ +// Copyright 2022 Harness Inc. All rights reserved. +// Use of this source code is governed by the Polyform Free Trial License +// that can be found in the LICENSE.md file for this repository. + +package job + +import ( + "context" + "errors" + "fmt" + "runtime/debug" + "sync" + "time" + + "github.com/harness/gitness/internal/store" + "github.com/harness/gitness/lock" + "github.com/harness/gitness/pubsub" + "github.com/harness/gitness/types" + "github.com/harness/gitness/types/enum" + + "github.com/gorhill/cronexpr" + "github.com/rs/zerolog/log" +) + +// Scheduler controls execution of background jobs. +type Scheduler struct { + // dependencies + store store.JobStore + executor *Executor + mxManager lock.MutexManager + pubsubService pubsub.PubSub + + // configuration fields + instanceID string + maxRunning int + purgeMinOldAge time.Duration + + // synchronization stuff + signal chan time.Time + done chan struct{} + wgRunning sync.WaitGroup + cancelJobMx sync.Mutex + cancelJobMap map[string]context.CancelFunc +} + +func NewScheduler( + jobStore store.JobStore, + executor *Executor, + mxManager lock.MutexManager, + pubsubService pubsub.PubSub, + instanceID string, + maxRunning int, + purgeMinOldAge time.Duration, +) (*Scheduler, error) { + if maxRunning < 1 { + maxRunning = 1 + } + return &Scheduler{ + store: jobStore, + executor: executor, + mxManager: mxManager, + pubsubService: pubsubService, + + instanceID: instanceID, + maxRunning: maxRunning, + purgeMinOldAge: purgeMinOldAge, + + cancelJobMap: map[string]context.CancelFunc{}, + }, nil +} + +// Run runs the background job scheduler. +// It's a blocking call. It blocks until the provided context is done. +func (s *Scheduler) Run(ctx context.Context) error { + if s.done != nil { + return errors.New("already started") + } + + consumer := s.pubsubService.Subscribe(ctx, PubSubTopicCancelJob, s.handleCancelJob) + defer func() { + err := consumer.Close() + if err != nil { + log.Ctx(ctx).Err(err). + Msg("job scheduler: failed to close pubsub cancel job consumer") + } + }() + + if err := s.createNecessaryJobs(ctx); err != nil { + return fmt.Errorf("failed to create necessary jobs: %w", err) + } + + if err := s.registerNecessaryJobs(); err != nil { + return fmt.Errorf("failed to register scheduler's internal jobs: %w", err) + } + + s.executor.finishRegistration() + + log.Ctx(ctx).Debug().Msg("job scheduler: starting") + + s.done = make(chan struct{}) + defer close(s.done) + + s.signal = make(chan time.Time, 1) + + timer := newSchedulerTimer() + defer timer.Stop() + + for { + err := func() error { + defer func() { + if r := recover(); r != nil { + stack := string(debug.Stack()) + log.Ctx(ctx).Error(). + Str("panic", fmt.Sprintf("[%T] job scheduler panic: %v", r, r)). + Msg(stack) + } + }() + + select { + case <-ctx.Done(): + return ctx.Err() + + case newTime := <-s.signal: + dur := timer.RescheduleEarlier(newTime) + if dur > 0 { + log.Ctx(ctx).Trace(). + Msgf("job scheduler: update of scheduled job processing time... runs in %s", dur) + } + return nil + + case now := <-timer.Ch(): + count, nextExec, gotAllJobs, err := s.processReadyJobs(ctx, now) + + // If the next processing time isn't known use the default. + if nextExec.IsZero() { + const period = time.Minute + nextExec = now.Add(period) + } + + // Reset the timer. Make the timer edgy if there are more jobs available. + dur := timer.ResetAt(nextExec, !gotAllJobs) + + if err != nil { + log.Ctx(ctx).Err(err). + Msgf("job scheduler: failed to process jobs; next iteration in %s", dur) + } else { + log.Ctx(ctx).Trace(). + Msgf("job scheduler: started %d jobs; next iteration in %s", count, dur) + } + + return nil + } + }() + if err != nil { + return err + } + } +} + +// WaitJobsDone waits until execution of all jobs has finished. +// It is intended to be used for graceful shutdown, after the Run method has finished. +func (s *Scheduler) WaitJobsDone(ctx context.Context) { + log.Ctx(ctx).Debug().Msg("job scheduler: stopping... waiting for the currently running jobs to finish") + + ch := make(chan struct{}) + go func() { + s.wgRunning.Wait() + close(ch) + }() + + select { + case <-ctx.Done(): + log.Ctx(ctx).Warn().Msg("job scheduler: stop interrupted") + case <-ch: + log.Ctx(ctx).Info().Msg("job scheduler: gracefully stopped") + } +} + +func (s *Scheduler) CancelJob(ctx context.Context, jobUID string) error { + s.cancelJobMx.Lock() + cancelFn, ok := s.cancelJobMap[jobUID] + s.cancelJobMx.Unlock() + + if ok { + cancelFn() + return nil + } + + return s.pubsubService.Publish(ctx, PubSubTopicCancelJob, []byte(jobUID)) +} + +func (s *Scheduler) handleCancelJob(payload []byte) error { + jobUID := string(payload) + if jobUID == "" { + return nil + } + + s.cancelJobMx.Lock() + cancelFn, ok := s.cancelJobMap[jobUID] + s.cancelJobMx.Unlock() + + if ok { + cancelFn() + } + + return nil +} + +// scheduleProcessing triggers processing of ready jobs. +// This should be run after adding new jobs to the database. +func (s *Scheduler) scheduleProcessing(scheduled time.Time) { + go func() { + select { + case <-s.done: + case s.signal <- scheduled: + } + }() +} + +// scheduleIfHaveMoreJobs triggers processing of ready jobs if the timer is edgy. +// The timer would be edgy if the previous iteration found more jobs that it could start (full capacity). +// This should be run after a non-recurring job has finished. +func (s *Scheduler) scheduleIfHaveMoreJobs() { + s.scheduleProcessing(time.Time{}) // zero time will trigger the timer if it's edgy +} + +// RunJob runs a single job of the provided type. +// All parameters a job receives must be inside the data string (as JSON or whatever the job handler can interpret). +// The caller gets the job UID which can then by used to track execution (the job state and progress percentage). +func (s *Scheduler) RunJob(ctx context.Context, + jobType string, + maxRetries int, + timeout time.Duration, + data string, +) (string, error) { + if jobType == "" { + return "", errors.New("jobType must not be empty") + } + + if maxRetries < 0 { + return "", errors.New("maxRetries must be positive") + } + + if timeout < time.Second { + return "", errors.New("timeout too short") + } + + uid, err := UID() + if err != nil { + return "", fmt.Errorf("failed to generate job UID: %w", err) + } + + nowMilli := time.Now().UnixMilli() + + job := &types.Job{ + UID: uid, + Created: nowMilli, + Updated: nowMilli, + Type: jobType, + Priority: enum.JobPriorityNormal, + Data: data, + Result: "", + MaxDurationSeconds: int(timeout / time.Second), + MaxRetries: maxRetries, + State: enum.JobStateScheduled, + Scheduled: nowMilli, + TotalExecutions: 0, + RunBy: "", + RunDeadline: nowMilli, + RunProgress: ProgressMin, + LastExecuted: 0, // never executed + IsRecurring: false, + RecurringCron: "", + ConsecutiveFailures: 0, + LastFailureError: "", + } + + ctx = log.Ctx(ctx).With(). + Str("job.UID", job.UID). + Str("job.Type", job.Type). + Logger().WithContext(ctx) + + mx, err := globalLock(ctx, s.mxManager) + if err != nil { + return "", fmt.Errorf("failed to obtain global lock to add new job: %w", err) + } + + defer func() { + if err := mx.Unlock(ctx); err != nil { + log.Ctx(ctx).Err(err).Msg("failed to release global lock after adding a new job") + } + }() + + err = s.store.Create(ctx, job) + if err != nil { + return "", fmt.Errorf("failed to add new job to the database: %w", err) + } + + available, err := s.availableSlots(ctx) + if err != nil { + return "", fmt.Errorf("failed to count available slots for job execution: %w", err) + } + + if available == 0 { + // already running at the full capacity: the job is scheduled, we're done. + return uid, nil + } + + err = s.runJob(ctx, job) + if err != nil { + log.Ctx(ctx).Err(err).Msg("failed to run job") + } + + return uid, nil +} + +// processReadyJobs executes jobs that are ready to run. This function is periodically run by the Scheduler. +// The function returns the number of jobs it has is started, the next scheduled execution time (of this function) +// and a bool value if all currently available ready jobs were started. +// Internally the Scheduler uses an "edgy" timer to reschedule calls of this function. +// The edgy option of the timer will be on if this function hasn't been able to start all job that are ready to run. +// If the timer has the edgy option turned on it will trigger the timer (and thus this function will be called) +// when any currently running job finishes successfully or fails. +func (s *Scheduler) processReadyJobs(ctx context.Context, now time.Time) (int, time.Time, bool, error) { + mx, err := globalLock(ctx, s.mxManager) + if err != nil { + return 0, time.Time{}, false, + fmt.Errorf("failed to obtain global lock to periodically process ready jobs: %w", err) + } + + defer func() { + if err := mx.Unlock(ctx); err != nil { + log.Ctx(ctx).Err(err). + Msg("failed to release global lock after periodic processing of ready jobs") + } + }() + + availableCount, err := s.availableSlots(ctx) + if err != nil { + return 0, time.Time{}, false, + fmt.Errorf("failed to count available slots for job execution: %w", err) + } + + // get one over the limit to check if all ready jobs are fetched + jobs, err := s.store.ListReady(ctx, now, availableCount+1) + if err != nil { + return 0, time.Time{}, false, + fmt.Errorf("failed to load scheduled jobs: %w", err) + } + + var ( + countExecuted int + knownNextExecTime time.Time + gotAllJobs bool + ) + + if len(jobs) > availableCount { + // More jobs are ready than we are able to run. + jobs = jobs[:availableCount] + } else { + gotAllJobs = true + knownNextExecTime, err = s.store.NextScheduledTime(ctx, now) + if err != nil { + return 0, time.Time{}, false, + fmt.Errorf("failed to read next scheduled time: %w", err) + } + } + + for _, job := range jobs { + jobCtx := log.Ctx(ctx).With(). + Str("job.UID", job.UID). + Str("job.Type", job.Type). + Logger().WithContext(ctx) + + err = s.runJob(jobCtx, job) + if err != nil { + knownNextExecTime = time.Time{} + gotAllJobs = false + log.Ctx(jobCtx).Err(err).Msg("failed to run job") + continue + } + + countExecuted++ + } + + return countExecuted, knownNextExecTime, gotAllJobs, nil +} + +func (s *Scheduler) availableSlots(ctx context.Context) (int, error) { + countRunning, err := s.store.CountRunning(ctx) + if err != nil { + return 0, err + } + + availableCount := s.maxRunning - countRunning + if availableCount < 0 { + return 0, nil + } + + return availableCount, nil +} + +// runJob updates the job in the database and starts it in a separate goroutine. +// The function will also log the execution. +func (s *Scheduler) runJob(ctx context.Context, job *types.Job) error { + // Update the job fields for the new execution + s.preExec(job) + + if err := s.store.UpdateExecution(ctx, job); err != nil { + return err + } + + // tell everybody that a job has started + if err := publishStateChange(ctx, s.pubsubService, job); err != nil { + log.Err(err).Msg("failed to publish job state change") + } + + s.wgRunning.Add(1) + go func(jobCtx context.Context, job *types.Job) { + defer s.wgRunning.Done() + + log.Ctx(jobCtx).Debug().Msg("started job") + + timeStart := time.Now() + + // Run the job + execResult, execFailure := s.doExec(jobCtx, job) + + // Update the job fields, reschedule if necessary. + postExec(job, execResult, execFailure) + + // Use the context.Background() because we want to update the job even if the job's context is done. + // The context can be done because the job exceeded its deadline or the server is shutting down. + backgroundCtx := context.Background() + + // tell everybody that a job has finished execution + if err := publishStateChange(backgroundCtx, s.pubsubService, job); err != nil { + log.Ctx(jobCtx).Err(err).Msg("failed to publish job state change") + } + + if mx, err := globalLock(backgroundCtx, s.mxManager); err != nil { + // If locking failed, just log the error and proceed to update the DB anyway. + log.Ctx(jobCtx).Err(err).Msg("failed to obtain global lock to update job after execution") + } else { + defer func() { + if err := mx.Unlock(backgroundCtx); err != nil { + log.Ctx(jobCtx).Err(err).Msg("failed to release global lock to update job after execution") + } + }() + } + + if err := s.store.UpdateExecution(backgroundCtx, job); err != nil { + log.Ctx(jobCtx).Err(err).Msg("failed to update after execution") + return + } + + logInfo := log.Ctx(jobCtx).Info().Str("duration", time.Since(timeStart).String()) + + if job.IsRecurring { + logInfo = logInfo.Bool("job.IsRecurring", true) + } + if job.Result != "" { + logInfo = logInfo.Str("job.Result", job.Result) + } + if job.LastFailureError != "" { + logInfo = logInfo.Str("job.Failure", job.LastFailureError) + } + + switch job.State { + case enum.JobStateFinished: + logInfo.Msg("job successfully finished") + s.scheduleIfHaveMoreJobs() + + case enum.JobStateFailed: + logInfo.Msg("job failed") + s.scheduleIfHaveMoreJobs() + + case enum.JobStateScheduled: + scheduledTime := time.UnixMilli(job.Scheduled) + logInfo. + Str("job.Scheduled", scheduledTime.Format(time.RFC3339Nano)). + Msg("job finished and rescheduled") + + s.scheduleProcessing(scheduledTime) + + case enum.JobStateRunning: + log.Ctx(jobCtx).Error().Msg("should not happen; job still has state=running after finishing") + } + }(ctx, job) + + return nil +} + +// preExec updates the provided types.Job before execution. +func (s *Scheduler) preExec(job *types.Job) { + if job.MaxDurationSeconds < 1 { + job.MaxDurationSeconds = 1 + } + + now := time.Now() + nowMilli := now.UnixMilli() + + execDuration := time.Duration(job.MaxDurationSeconds) * time.Second + execDeadline := now.Add(execDuration) + + job.Updated = nowMilli + job.LastExecuted = nowMilli + job.State = enum.JobStateRunning + job.RunDeadline = execDeadline.UnixMilli() + job.RunBy = s.instanceID + job.RunProgress = ProgressMin + job.TotalExecutions++ + job.Result = "" + job.LastFailureError = "" +} + +// doExec executes the provided types.Job. +func (s *Scheduler) doExec(ctx context.Context, job *types.Job) (execResult, execError string) { + execDeadline := time.UnixMilli(job.RunDeadline) + + jobCtx, done := context.WithDeadline(ctx, execDeadline) + defer done() + + s.cancelJobMx.Lock() + if _, ok := s.cancelJobMap[job.UID]; ok { + // should not happen: jobs have unique UIDs! + s.cancelJobMx.Unlock() + return "", "failed to start: already running" + } + s.cancelJobMap[job.UID] = done + s.cancelJobMx.Unlock() + + defer func() { + s.cancelJobMx.Lock() + delete(s.cancelJobMap, job.UID) + s.cancelJobMx.Unlock() + }() + + execResult, err := s.executor.exec(jobCtx, job.UID, job.Type, job.Data) + if err != nil { + execError = err.Error() + } + + return +} + +// postExec updates the provided types.Job after execution and reschedules it if necessary. +func postExec(job *types.Job, resultData, resultErr string) { + now := time.Now() + nowMilli := now.UnixMilli() + + job.Updated = nowMilli + job.Result = resultData + job.RunBy = "" + + if resultErr != "" { + job.ConsecutiveFailures++ + job.State = enum.JobStateFailed + job.LastFailureError = resultErr + } else { + job.State = enum.JobStateFinished + job.RunProgress = ProgressMax + } + + // Reschedule recurring jobs + if job.IsRecurring { + if resultErr == "" { + job.ConsecutiveFailures = 0 + } + + exp, err := cronexpr.Parse(job.RecurringCron) + if err != nil { + job.State = enum.JobStateFailed + + messages := fmt.Sprintf("failed to parse cron string: %s", err.Error()) + if job.LastFailureError != "" { + messages = messages + "; " + job.LastFailureError + } + + job.LastFailureError = messages + } else { + job.State = enum.JobStateScheduled + job.Scheduled = exp.Next(now).UnixMilli() + } + + return + } + + // Reschedule the failed job if retrying is allowed + if job.State == enum.JobStateFailed && job.ConsecutiveFailures <= job.MaxRetries { + const retryDelay = 15 * time.Second + job.State = enum.JobStateScheduled + job.Scheduled = now.Add(retryDelay).UnixMilli() + job.RunProgress = ProgressMin + } +} + +func (s *Scheduler) AddRecurring( + ctx context.Context, + jobUID, + jobType, + cronDef string, + maxDur time.Duration, +) error { + cronExp, err := cronexpr.Parse(cronDef) + if err != nil { + return fmt.Errorf("invalid cron definition string for job type=%s: %w", jobType, err) + } + + now := time.Now() + nowMilli := now.UnixMilli() + + nextExec := cronExp.Next(now) + + job := &types.Job{ + UID: jobUID, + Created: nowMilli, + Updated: nowMilli, + Type: jobType, + Priority: enum.JobPriorityElevated, + Data: "", + Result: "", + MaxDurationSeconds: int(maxDur / time.Second), + MaxRetries: 0, + State: enum.JobStateScheduled, + Scheduled: nextExec.UnixMilli(), + TotalExecutions: 0, + RunBy: "", + RunDeadline: 0, + RunProgress: 0, + LastExecuted: 0, + IsRecurring: true, + RecurringCron: cronDef, + ConsecutiveFailures: 0, + LastFailureError: "", + } + + err = s.store.Upsert(ctx, job) + if err != nil { + return fmt.Errorf("failed to upsert job id=%s type=%s: %w", jobUID, jobType, err) + } + + return nil +} + +func (s *Scheduler) createNecessaryJobs(ctx context.Context) error { + mx, err := globalLock(ctx, s.mxManager) + if err != nil { + return fmt.Errorf("failed to obtain global lock to create necessary jobs: %w", err) + } + + defer func() { + if err := mx.Unlock(ctx); err != nil { + log.Ctx(ctx).Err(err). + Msg("failed to release global lock after creating necessary jobs") + } + }() + + err = s.AddRecurring(ctx, jobUIDPurge, jobTypePurge, jobCronPurge, 5*time.Second) + if err != nil { + return err + } + + err = s.AddRecurring(ctx, jobUIDOverdue, jobTypeOverdue, jobCronOverdue, 5*time.Second) + if err != nil { + return err + } + + return nil +} + +// registerNecessaryJobs registers two jobs: overdue job recovery and purge old finished jobs. +// These two jobs types are integral part of the job scheduler. +func (s *Scheduler) registerNecessaryJobs() error { + handlerOverdue := newJobOverdue(s.store, s.mxManager, s) + err := s.executor.Register(jobTypeOverdue, handlerOverdue) + if err != nil { + return err + } + + handlerPurge := newJobPurge(s.store, s.mxManager, s.purgeMinOldAge) + err = s.executor.Register(jobTypePurge, handlerPurge) + if err != nil { + return err + } + + return nil +} diff --git a/internal/services/job/timer.go b/internal/services/job/timer.go new file mode 100644 index 000000000..dfc93f70a --- /dev/null +++ b/internal/services/job/timer.go @@ -0,0 +1,111 @@ +// Copyright 2022 Harness Inc. All rights reserved. +// Use of this source code is governed by the Polyform Free Trial License +// that can be found in the LICENSE.md file for this repository. + +package job + +import ( + "time" +) + +const timerMaxDur = 30 * time.Minute +const timerMinDur = time.Nanosecond + +type schedulerTimer struct { + timerAt time.Time + timer *time.Timer + edgy bool // if true, the next RescheduleEarlier call will trigger the timer immediately. +} + +// newSchedulerTimer created new timer for the Scheduler. It is created to fire immediately. +func newSchedulerTimer() *schedulerTimer { + return &schedulerTimer{ + timerAt: time.Now().Add(timerMinDur), + timer: time.NewTimer(timerMinDur), + } +} + +// ResetAt resets the internal timer to trigger at the provided time. +// If the provided time is zero, it will schedule it to after the max duration. +func (t *schedulerTimer) ResetAt(next time.Time, edgy bool) time.Duration { + return t.resetAt(time.Now(), next, edgy) +} + +func (t *schedulerTimer) resetAt(now, next time.Time, edgy bool) time.Duration { + var dur time.Duration + + dur = next.Sub(now) + if dur < timerMinDur { + dur = timerMinDur + next = now.Add(dur) + } else if dur > timerMaxDur { + dur = timerMaxDur + next = now.Add(dur) + } + + t.Stop() + t.edgy = edgy + t.timerAt = next + t.timer.Reset(dur) + + return dur +} + +// RescheduleEarlier will reset the timer if the new time is earlier than the previous time. +// Otherwise, the function does nothing and returns 0. +// Providing zero time triggers the timer if it's edgy, otherwise does nothing. +func (t *schedulerTimer) RescheduleEarlier(next time.Time) time.Duration { + return t.rescheduleEarlier(time.Now(), next) +} + +func (t *schedulerTimer) rescheduleEarlier(now, next time.Time) time.Duration { + var dur time.Duration + + switch { + case t.edgy: + // if the timer is edgy trigger it immediately + dur = timerMinDur + + case next.IsZero(): + // if the provided time is zero: trigger the timer if it's edgy otherwise do nothing + if !t.edgy { + return 0 + } + dur = timerMinDur + + case !next.Before(t.timerAt): + // do nothing if the timer is already scheduled to run sooner than the provided time + return 0 + + default: + dur = next.Sub(now) + if dur < timerMinDur { + dur = timerMinDur + } + } + + next = now.Add(dur) + + t.Stop() + t.timerAt = next + t.timer.Reset(dur) + + return dur +} + +func (t *schedulerTimer) Ch() <-chan time.Time { + return t.timer.C +} + +func (t *schedulerTimer) Stop() { + // stop the timer + t.timer.Stop() + + // consume the timer's tick if any + select { + case <-t.timer.C: + default: + } + + t.timerAt = time.Time{} +} diff --git a/internal/services/job/timer_test.go b/internal/services/job/timer_test.go new file mode 100644 index 000000000..fe38b0f7c --- /dev/null +++ b/internal/services/job/timer_test.go @@ -0,0 +1,105 @@ +// Copyright 2022 Harness Inc. All rights reserved. +// Use of this source code is governed by the Polyform Free Trial License +// that can be found in the LICENSE.md file for this repository. + +package job + +import ( + "testing" + "time" +) + +func TestSchedulerTimer_ResetAt(t *testing.T) { + now := time.Now() + tests := []struct { + name string + at time.Time + exp time.Duration + }{ + { + name: "zero", + at: time.Time{}, + exp: timerMinDur, + }, + { + name: "immediate", + at: now, + exp: timerMinDur, + }, + { + name: "30s", + at: now.Add(30 * time.Second), + exp: 30 * time.Second, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + timer := newSchedulerTimer() + dur := timer.resetAt(now, test.at, false) + if want, got := test.exp, dur; want != dur { + t.Errorf("want: %s, got: %s", want.String(), got.String()) + } + }) + } +} + +func TestSchedulerTimer_TryResetAt(t *testing.T) { + now := time.Now() + tests := []struct { + name string + at time.Time + edgy bool + exp time.Duration + }{ + { + name: "past", + at: now.Add(-time.Second), + exp: timerMinDur, + }, + { + name: "30s", + at: now.Add(30 * time.Second), + exp: 30 * time.Second, + }, + { + name: "90s", + at: now.Add(90 * time.Second), + exp: 0, + }, + { + name: "30s-edgy", + at: now.Add(30 * time.Second), + edgy: true, + exp: timerMinDur, + }, + { + name: "90s-edgy", + at: now.Add(90 * time.Second), + edgy: true, + exp: timerMinDur, + }, + { + name: "zero", + at: time.Time{}, + exp: 0, + }, + { + name: "zero-edgy", + at: time.Time{}, + edgy: true, + exp: timerMinDur, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + timer := newSchedulerTimer() + timer.resetAt(now, now.Add(time.Minute), test.edgy) + dur := timer.rescheduleEarlier(now, test.at) + if want, got := test.exp, dur; want != dur { + t.Errorf("want: %s, got: %s", want.String(), got.String()) + } + }) + } +} diff --git a/internal/services/job/uid.go b/internal/services/job/uid.go new file mode 100644 index 000000000..ac23c34a1 --- /dev/null +++ b/internal/services/job/uid.go @@ -0,0 +1,25 @@ +// Copyright 2022 Harness Inc. All rights reserved. +// Use of this source code is governed by the Polyform Free Trial License +// that can be found in the LICENSE.md file for this repository. + +package job + +import ( + "crypto/rand" + "encoding/base32" +) + +// UID returns unique random string with length equal to 16. +func UID() (string, error) { + const uidSizeBytes = 10 // must be divisible by 5, the resulting string length will be uidSizeBytes/5*8 + + var buf [uidSizeBytes]byte + _, err := rand.Read(buf[:]) + if err != nil { + return "", err + } + + uid := base32.StdEncoding.EncodeToString(buf[:]) + + return uid, nil +} diff --git a/internal/services/job/wire.go b/internal/services/job/wire.go new file mode 100644 index 000000000..0a5c49a5b --- /dev/null +++ b/internal/services/job/wire.go @@ -0,0 +1,47 @@ +// Copyright 2022 Harness Inc. All rights reserved. +// Use of this source code is governed by the Polyform Free Trial License +// that can be found in the LICENSE.md file for this repository. + +package job + +import ( + "github.com/harness/gitness/internal/store" + "github.com/harness/gitness/lock" + "github.com/harness/gitness/pubsub" + "github.com/harness/gitness/types" + + "github.com/google/wire" +) + +var WireSet = wire.NewSet( + ProvideExecutor, + ProvideScheduler, +) + +func ProvideExecutor( + jobStore store.JobStore, + pubsubService pubsub.PubSub, +) *Executor { + return NewExecutor( + jobStore, + pubsubService, + ) +} + +func ProvideScheduler( + jobStore store.JobStore, + executor *Executor, + mutexManager lock.MutexManager, + pubsubService pubsub.PubSub, + config *types.Config, +) (*Scheduler, error) { + return NewScheduler( + jobStore, + executor, + mutexManager, + pubsubService, + config.InstanceID, + config.BackgroundJobs.MaxRunning, + config.BackgroundJobs.PurgeFinishedOlderThan, + ) +} diff --git a/internal/services/services.go b/internal/services/services.go index 70985dae3..1cb5efedd 100644 --- a/internal/services/services.go +++ b/internal/services/services.go @@ -5,6 +5,7 @@ package services import ( + "github.com/harness/gitness/internal/services/job" "github.com/harness/gitness/internal/services/pullreq" "github.com/harness/gitness/internal/services/webhook" @@ -16,16 +17,22 @@ var WireSet = wire.NewSet( ) type Services struct { - ws *webhook.Service - bms *pullreq.Service + Webhook *webhook.Service + PullReq *pullreq.Service + JobExecutor *job.Executor + JobScheduler *job.Scheduler } func ProvideServices( - ws *webhook.Service, - bms *pullreq.Service, + webhooksSrv *webhook.Service, + pullReqSrv *pullreq.Service, + jobExecutor *job.Executor, + jobScheduler *job.Scheduler, ) Services { return Services{ - ws: ws, - bms: bms, + Webhook: webhooksSrv, + PullReq: pullReqSrv, + JobExecutor: jobExecutor, + JobScheduler: jobScheduler, } } diff --git a/internal/store/database.go b/internal/store/database.go index 39549c4ed..ca0918d6f 100644 --- a/internal/store/database.go +++ b/internal/store/database.go @@ -441,6 +441,42 @@ type ( Delete(ctx context.Context, repoID, reqCheckID int64) error } + JobStore interface { + // Find fetches a job by its unique identifier. + Find(ctx context.Context, uid string) (*types.Job, error) + + // Create is used to create a new job. + Create(ctx context.Context, job *types.Job) error + + // Upsert will insert the job in the database if the job didn't already exist, + // or it will update the existing one but only if its definition has changed. + Upsert(ctx context.Context, job *types.Job) error + + // UpdateDefinition is used to update a job definition. + UpdateDefinition(ctx context.Context, job *types.Job) error + + // UpdateExecution is used to update a job before and after execution. + UpdateExecution(ctx context.Context, job *types.Job) error + + // UpdateProgress is used to update a job progress data. + UpdateProgress(ctx context.Context, job *types.Job) error + + // CountRunning returns number of jobs that are currently being run. + CountRunning(ctx context.Context) (int, error) + + // ListReady returns a list of jobs that are ready for execution. + ListReady(ctx context.Context, now time.Time, limit int) ([]*types.Job, error) + + // ListDeadlineExceeded returns a list of jobs that have exceeded their execution deadline. + ListDeadlineExceeded(ctx context.Context, now time.Time) ([]*types.Job, error) + + // NextScheduledTime returns a scheduled time of the next ready job. + NextScheduledTime(ctx context.Context, now time.Time) (time.Time, error) + + // DeleteOld removes non-recurring jobs that have finished execution or have failed. + DeleteOld(ctx context.Context, olderThan time.Time) (int64, error) + } + PipelineStore interface { // Find returns a pipeline given a pipeline ID from the datastore. Find(ctx context.Context, id int64) (*types.Pipeline, error) diff --git a/internal/store/database/job.go b/internal/store/database/job.go new file mode 100644 index 000000000..3e40075a6 --- /dev/null +++ b/internal/store/database/job.go @@ -0,0 +1,430 @@ +// Copyright 2022 Harness Inc. All rights reserved. +// Use of this source code is governed by the Polyform Free Trial License +// that can be found in the LICENSE.md file for this repository. + +package database + +import ( + "context" + "database/sql" + "errors" + "fmt" + "time" + + "github.com/harness/gitness/internal/store" + gitness_store "github.com/harness/gitness/store" + "github.com/harness/gitness/store/database" + "github.com/harness/gitness/store/database/dbtx" + "github.com/harness/gitness/types" + "github.com/harness/gitness/types/enum" + + "github.com/jmoiron/sqlx" +) + +var _ store.JobStore = (*JobStore)(nil) + +func NewJobStore(db *sqlx.DB) *JobStore { + return &JobStore{ + db: db, + } +} + +type JobStore struct { + db *sqlx.DB +} + +const ( + jobColumns = ` + job_uid + ,job_created + ,job_updated + ,job_type + ,job_priority + ,job_data + ,job_result + ,job_max_duration_seconds + ,job_max_retries + ,job_state + ,job_scheduled + ,job_total_executions + ,job_run_by + ,job_run_deadline + ,job_run_progress + ,job_last_executed + ,job_is_recurring + ,job_recurring_cron + ,job_consecutive_failures + ,job_last_failure_error` + + jobSelectBase = ` + SELECT` + jobColumns + ` + FROM jobs` +) + +// Find fetches a job by its unique identifier. +func (s *JobStore) Find(ctx context.Context, uid string) (*types.Job, error) { + const sqlQuery = jobSelectBase + ` + WHERE job_uid = $1` + + db := dbtx.GetAccessor(ctx, s.db) + + result := &types.Job{} + if err := db.GetContext(ctx, result, sqlQuery, uid); err != nil { + return nil, database.ProcessSQLErrorf(err, "Failed to find job by uid") + } + + return result, nil +} + +// Create creates a new job. +func (s *JobStore) Create(ctx context.Context, job *types.Job) error { + const sqlQuery = ` + INSERT INTO jobs (` + jobColumns + ` + ) VALUES ( + :job_uid + ,:job_created + ,:job_updated + ,:job_type + ,:job_priority + ,:job_data + ,:job_result + ,:job_max_duration_seconds + ,:job_max_retries + ,:job_state + ,:job_scheduled + ,:job_total_executions + ,:job_run_by + ,:job_run_deadline + ,:job_run_progress + ,:job_last_executed + ,:job_is_recurring + ,:job_recurring_cron + ,:job_consecutive_failures + ,:job_last_failure_error + )` + + db := dbtx.GetAccessor(ctx, s.db) + + query, arg, err := db.BindNamed(sqlQuery, job) + if err != nil { + return database.ProcessSQLErrorf(err, "Failed to bind job object") + } + + if _, err := db.ExecContext(ctx, query, arg...); err != nil { + return database.ProcessSQLErrorf(err, "Insert query failed") + } + + return nil +} + +// Upsert creates or updates a job. If the job didn't exist it will insert it in the database, +// otherwise it will update it but only if its definition has changed. +func (s *JobStore) Upsert(ctx context.Context, job *types.Job) error { + const sqlQuery = ` + INSERT INTO jobs (` + jobColumns + ` + ) VALUES ( + :job_uid + ,:job_created + ,:job_updated + ,:job_type + ,:job_priority + ,:job_data + ,:job_result + ,:job_max_duration_seconds + ,:job_max_retries + ,:job_state + ,:job_scheduled + ,:job_total_executions + ,:job_run_by + ,:job_run_deadline + ,:job_run_progress + ,:job_last_executed + ,:job_is_recurring + ,:job_recurring_cron + ,:job_consecutive_failures + ,:job_last_failure_error + ) + ON CONFLICT (job_uid) DO + UPDATE SET + job_updated = :job_updated + ,job_type = :job_type + ,job_priority = :job_priority + ,job_data = :job_data + ,job_result = :job_result + ,job_max_duration_seconds = :job_max_duration_seconds + ,job_max_retries = :job_max_retries + ,job_state = :job_state + ,job_scheduled = :job_scheduled + ,job_is_recurring = :job_is_recurring + ,job_recurring_cron = :job_recurring_cron + WHERE + jobs.job_type <> :job_type OR + jobs.job_priority <> :job_priority OR + jobs.job_data <> :job_data OR + jobs.job_max_duration_seconds <> :job_max_duration_seconds OR + jobs.job_max_retries <> :job_max_retries OR + jobs.job_is_recurring <> :job_is_recurring OR + jobs.job_recurring_cron <> :job_recurring_cron` + + db := dbtx.GetAccessor(ctx, s.db) + + query, arg, err := db.BindNamed(sqlQuery, job) + if err != nil { + return database.ProcessSQLErrorf(err, "Failed to bind job object") + } + + if _, err := db.ExecContext(ctx, query, arg...); err != nil { + return database.ProcessSQLErrorf(err, "Upsert query failed") + } + + return nil +} + +// UpdateDefinition is used to update a job definition. +func (s *JobStore) UpdateDefinition(ctx context.Context, job *types.Job) error { + const sqlQuery = ` + UPDATE jobs + SET + job_updated = :job_updated + ,job_type = :job_type + ,job_priority = :job_priority + ,job_data = :job_data + ,job_result = :job_result + ,job_max_duration_seconds = :job_max_duration_seconds + ,job_max_retries = :job_max_retries + ,job_state = :job_state + ,job_scheduled = :job_scheduled + ,job_is_recurring = :job_is_recurring + ,job_recurring_cron = :job_recurring_cron + WHERE job_uid = :job_uid` + + db := dbtx.GetAccessor(ctx, s.db) + + query, arg, err := db.BindNamed(sqlQuery, job) + if err != nil { + return database.ProcessSQLErrorf(err, "Failed to bind job object for update") + } + + result, err := db.ExecContext(ctx, query, arg...) + if err != nil { + return database.ProcessSQLErrorf(err, "Failed to update job definition") + } + + count, err := result.RowsAffected() + if err != nil { + return database.ProcessSQLErrorf(err, "Failed to get number of updated rows") + } + + if count == 0 { + return gitness_store.ErrResourceNotFound + } + + return nil +} + +// UpdateExecution is used to update a job before and after execution. +func (s *JobStore) UpdateExecution(ctx context.Context, job *types.Job) error { + const sqlQuery = ` + UPDATE jobs + SET + job_updated = :job_updated + ,job_result = :job_result + ,job_state = :job_state + ,job_scheduled = :job_scheduled + ,job_total_executions = :job_total_executions + ,job_run_by = :job_run_by + ,job_run_deadline = :job_run_deadline + ,job_last_executed = :job_last_executed + ,job_consecutive_failures = :job_consecutive_failures + ,job_last_failure_error = :job_last_failure_error + WHERE job_uid = :job_uid` + + db := dbtx.GetAccessor(ctx, s.db) + + query, arg, err := db.BindNamed(sqlQuery, job) + if err != nil { + return database.ProcessSQLErrorf(err, "Failed to bind job object for update") + } + + result, err := db.ExecContext(ctx, query, arg...) + if err != nil { + return database.ProcessSQLErrorf(err, "Failed to update job execution") + } + + count, err := result.RowsAffected() + if err != nil { + return database.ProcessSQLErrorf(err, "Failed to get number of updated rows") + } + + if count == 0 { + return gitness_store.ErrResourceNotFound + } + + return nil +} + +func (s *JobStore) UpdateProgress(ctx context.Context, job *types.Job) error { + const sqlQuery = ` + UPDATE jobs + SET + job_updated = :job_updated + ,job_result = :job_result + ,job_run_progress = :job_run_progress + WHERE job_uid = :job_uid AND job_state = 'running'` + + db := dbtx.GetAccessor(ctx, s.db) + + query, arg, err := db.BindNamed(sqlQuery, job) + if err != nil { + return database.ProcessSQLErrorf(err, "Failed to bind job object for update") + } + + result, err := db.ExecContext(ctx, query, arg...) + if err != nil { + return database.ProcessSQLErrorf(err, "Failed to update job progress") + } + + count, err := result.RowsAffected() + if err != nil { + return database.ProcessSQLErrorf(err, "Failed to get number of updated rows") + } + + if count == 0 { + return gitness_store.ErrResourceNotFound + } + + return nil +} + +// CountRunning returns number of jobs that are currently being run. +func (s *JobStore) CountRunning(ctx context.Context) (int, error) { + stmt := database.Builder. + Select("count(*)"). + From("jobs"). + Where("job_state = ?", enum.JobStateRunning) + + sql, args, err := stmt.ToSql() + if err != nil { + return 0, fmt.Errorf("failed to convert count running jobs query to sql: %w", err) + } + + db := dbtx.GetAccessor(ctx, s.db) + + var count int64 + err = db.QueryRowContext(ctx, sql, args...).Scan(&count) + if err != nil { + return 0, database.ProcessSQLErrorf(err, "failed executing count running jobs query") + } + + return int(count), nil +} + +// ListReady returns a list of jobs that are ready for execution: +// The jobs with state="scheduled" and scheduled time in the past. +func (s *JobStore) ListReady(ctx context.Context, now time.Time, limit int) ([]*types.Job, error) { + stmt := database.Builder. + Select(jobColumns). + From("jobs"). + Where("job_state = ?", enum.JobStateScheduled). + Where("job_scheduled <= ?", now.UnixMilli()). + OrderBy("job_priority desc, job_scheduled asc, job_uid asc"). + Limit(uint64(limit)) + + sql, args, err := stmt.ToSql() + if err != nil { + return nil, fmt.Errorf("failed to convert list scheduled jobs query to sql: %w", err) + } + + result := make([]*types.Job, 0) + + db := dbtx.GetAccessor(ctx, s.db) + + if err = db.SelectContext(ctx, &result, sql, args...); err != nil { + return nil, database.ProcessSQLErrorf(err, "failed to execute list scheduled jobs query") + } + + return result, nil +} + +// ListDeadlineExceeded returns a list of jobs that have exceeded their execution deadline. +func (s *JobStore) ListDeadlineExceeded(ctx context.Context, now time.Time) ([]*types.Job, error) { + stmt := database.Builder. + Select(jobColumns). + From("jobs"). + Where("job_state = ?", enum.JobStateRunning). + Where("job_run_deadline < ?", now.UnixMilli()). + OrderBy("job_run_deadline asc") + + sql, args, err := stmt.ToSql() + if err != nil { + return nil, fmt.Errorf("failed to convert list overdue jobs query to sql: %w", err) + } + + result := make([]*types.Job, 0) + + db := dbtx.GetAccessor(ctx, s.db) + + if err = db.SelectContext(ctx, &result, sql, args...); err != nil { + return nil, database.ProcessSQLErrorf(err, "failed to execute list overdue jobs query") + } + + return result, nil +} + +// NextScheduledTime returns a scheduled time of the next ready job or zero time if no such job exists. +func (s *JobStore) NextScheduledTime(ctx context.Context, now time.Time) (time.Time, error) { + stmt := database.Builder. + Select("job_scheduled"). + From("jobs"). + Where("job_state = ?", enum.JobStateScheduled). + Where("job_scheduled > ?", now.UnixMilli()). + OrderBy("job_scheduled asc"). + Limit(1) + + query, args, err := stmt.ToSql() + if err != nil { + return time.Time{}, fmt.Errorf("failed to convert next scheduled time query to sql: %w", err) + } + + db := dbtx.GetAccessor(ctx, s.db) + + var result int64 + + err = db.QueryRowContext(ctx, query, args...).Scan(&result) + if errors.Is(err, sql.ErrNoRows) { + return time.Time{}, nil + } + if err != nil { + return time.Time{}, database.ProcessSQLErrorf(err, "failed to execute next scheduled time query") + } + + return time.UnixMilli(result), nil +} + +// DeleteOld removes non-recurring jobs that have finished execution or have failed. +func (s *JobStore) DeleteOld(ctx context.Context, olderThan time.Time) (int64, error) { + stmt := database.Builder. + Delete("jobs"). + Where("(job_state = ? OR job_state = ?)", enum.JobStateFinished, enum.JobStateFailed). + Where("job_is_recurring = false"). + Where("job_last_executed < ?", olderThan.UnixMilli()) + + sql, args, err := stmt.ToSql() + if err != nil { + return 0, fmt.Errorf("failed to convert delete done jobs query to sql: %w", err) + } + + db := dbtx.GetAccessor(ctx, s.db) + + result, err := db.ExecContext(ctx, sql, args...) + if err != nil { + return 0, database.ProcessSQLErrorf(err, "failed to execute delete done jobs query") + } + + n, err := result.RowsAffected() + if err != nil { + return 0, database.ProcessSQLErrorf(err, "failed to get number of deleted jobs") + } + + return n, nil +} diff --git a/internal/store/database/migrate/postgres/0022_create_table_jobs.down.sql b/internal/store/database/migrate/postgres/0022_create_table_jobs.down.sql new file mode 100644 index 000000000..7c5d0fba0 --- /dev/null +++ b/internal/store/database/migrate/postgres/0022_create_table_jobs.down.sql @@ -0,0 +1,4 @@ +DROP INDEX jobs_last_executed; +DROP INDEX jobs_run_deadline; +DROP INDEX jobs_scheduled; +DROP TABLE jobs; diff --git a/internal/store/database/migrate/postgres/0022_create_table_jobs.up.sql b/internal/store/database/migrate/postgres/0022_create_table_jobs.up.sql new file mode 100644 index 000000000..0f6d6f60d --- /dev/null +++ b/internal/store/database/migrate/postgres/0022_create_table_jobs.up.sql @@ -0,0 +1,35 @@ +CREATE TABLE jobs ( + job_uid TEXT NOT NULL +,job_created BIGINT NOT NULL +,job_updated BIGINT NOT NULL +,job_type TEXT NOT NULL +,job_priority INTEGER NOT NULL +,job_data TEXT NOT NULL +,job_result TEXT NOT NULL +,job_max_duration_seconds INTEGER NOT NULL +,job_max_retries INTEGER NOT NULL +,job_state TEXT NOT NULL +,job_scheduled BIGINT NOT NULL +,job_total_executions INTEGER +,job_run_by TEXT NOT NULL +,job_run_deadline BIGINT +,job_run_progress INTEGER NOT NULL +,job_last_executed BIGINT +,job_is_recurring BOOLEAN NOT NULL +,job_recurring_cron TEXT NOT NULL +,job_consecutive_failures INTEGER NOT NULL +,job_last_failure_error TEXT NOT NULL +,CONSTRAINT pk_jobs_uid PRIMARY KEY (job_uid) +); + +CREATE INDEX jobs_scheduled + ON jobs(job_scheduled) + WHERE job_state = 'scheduled'; + +CREATE INDEX jobs_run_deadline + ON jobs(job_run_deadline) + WHERE job_state = 'running'; + +CREATE INDEX jobs_last_executed + ON jobs(job_last_executed) + WHERE job_state = 'finished' OR job_state = 'failed'; diff --git a/internal/store/database/migrate/sqlite/0022_create_table_jobs.down.sql b/internal/store/database/migrate/sqlite/0022_create_table_jobs.down.sql new file mode 100644 index 000000000..7c5d0fba0 --- /dev/null +++ b/internal/store/database/migrate/sqlite/0022_create_table_jobs.down.sql @@ -0,0 +1,4 @@ +DROP INDEX jobs_last_executed; +DROP INDEX jobs_run_deadline; +DROP INDEX jobs_scheduled; +DROP TABLE jobs; diff --git a/internal/store/database/migrate/sqlite/0022_create_table_jobs.up.sql b/internal/store/database/migrate/sqlite/0022_create_table_jobs.up.sql new file mode 100644 index 000000000..9b625f378 --- /dev/null +++ b/internal/store/database/migrate/sqlite/0022_create_table_jobs.up.sql @@ -0,0 +1,35 @@ +CREATE TABLE jobs ( + job_uid TEXT NOT NULL +,job_created BIGINT NOT NULL +,job_updated BIGINT NOT NULL +,job_type TEXT NOT NULL +,job_priority INTEGER NOT NULL +,job_data TEXT NOT NULL +,job_result TEXT NOT NULL +,job_max_duration_seconds INTEGER NOT NULL +,job_max_retries INTEGER NOT NULL +,job_state TEXT NOT NULL +,job_scheduled BIGINT NOT NULL +,job_total_executions INTEGER +,job_run_by TEXT NOT NULL +,job_run_deadline BIGINT +,job_run_progress INTEGER NOT NULL +,job_last_executed BIGINT +,job_is_recurring BOOLEAN NOT NULL +,job_recurring_cron TEXT NOT NULL +,job_consecutive_failures INTEGER NOT NULL +,job_last_failure_error TEXT NOT NULL +,CONSTRAINT pk_jobs_uid PRIMARY KEY (job_uid) +); + +CREATE INDEX jobs_scheduled + ON jobs(job_scheduled) + WHERE job_state = 'scheduled'; + +CREATE INDEX jobs_run_deadline + ON jobs(job_run_deadline) + WHERE job_state = 'running'; + +CREATE INDEX jobs_last_executed + ON jobs(job_last_executed) + WHERE job_state = 'finished' OR job_state = 'failed'; diff --git a/internal/store/database/wire.go b/internal/store/database/wire.go index a51f1b7e7..a1a969d41 100644 --- a/internal/store/database/wire.go +++ b/internal/store/database/wire.go @@ -23,6 +23,7 @@ var WireSet = wire.NewSet( ProvidePathStore, ProvideSpaceStore, ProvideRepoStore, + ProvideJobStore, ProvideExecutionStore, ProvidePipelineStore, ProvideStageStore, @@ -87,6 +88,11 @@ func ProvideRepoStore(db *sqlx.DB, pathCache store.PathCache) store.RepoStore { return NewRepoStore(db, pathCache) } +// ProvideJobStore provides a job store. +func ProvideJobStore(db *sqlx.DB) store.JobStore { + return NewJobStore(db) +} + // ProvidePipelineStore provides a pipeline store. func ProvidePipelineStore(db *sqlx.DB) store.PipelineStore { return NewPipelineStore(db) diff --git a/types/config.go b/types/config.go index 81c388072..42608d4b7 100644 --- a/types/config.go +++ b/types/config.go @@ -180,4 +180,13 @@ type Config struct { SendTimeout time.Duration `envconfig:"GITNESS_PUBSUB_SEND_TIMEOUT" default:"60s"` ChannelSize int `envconfig:"GITNESS_PUBSUB_CHANNEL_SIZE" default:"100"` } + + BackgroundJobs struct { + // MaxRunning is maximum number of jobs that can be running at once. + MaxRunning int `envconfig:"GITNESS_JOBS_MAX_RUNNING" default:"10"` + + // PurgeFinishedOlderThan is duration after non-recurring, + // finished and failed jobs will be purged from the DB. + PurgeFinishedOlderThan time.Duration `envconfig:"GITNESS_JOBS_PURGE_FINISHED_OLDER_THAN" default:"120h"` + } } diff --git a/types/enum/job.go b/types/enum/job.go new file mode 100644 index 000000000..a92a060da --- /dev/null +++ b/types/enum/job.go @@ -0,0 +1,25 @@ +// Copyright 2022 Harness Inc. All rights reserved. +// Use of this source code is governed by the Polyform Free Trial License +// that can be found in the LICENSE.md file for this repository. + +package enum + +// JobState represents state of a background job. +type JobState string + +// JobState enumeration. +const ( + JobStateScheduled JobState = "scheduled" + JobStateRunning JobState = "running" + JobStateFinished JobState = "finished" + JobStateFailed JobState = "failed" +) + +// JobPriority represents priority of a background job. +type JobPriority int + +// JobPriority enumeration. +const ( + JobPriorityNormal JobPriority = 0 + JobPriorityElevated JobPriority = 1 +) diff --git a/types/job.go b/types/job.go new file mode 100644 index 000000000..e5ddb9cee --- /dev/null +++ b/types/job.go @@ -0,0 +1,38 @@ +// Copyright 2022 Harness Inc. All rights reserved. +// Use of this source code is governed by the Polyform Free Trial License +// that can be found in the LICENSE.md file for this repository. + +package types + +import "github.com/harness/gitness/types/enum" + +type Job struct { + UID string `db:"job_uid"` + Created int64 `db:"job_created"` + Updated int64 `db:"job_updated"` + Type string `db:"job_type"` + Priority enum.JobPriority `db:"job_priority"` + Data string `db:"job_data"` + Result string `db:"job_result"` + MaxDurationSeconds int `db:"job_max_duration_seconds"` + MaxRetries int `db:"job_max_retries"` + State enum.JobState `db:"job_state"` + Scheduled int64 `db:"job_scheduled"` + TotalExecutions int `db:"job_total_executions"` + RunBy string `db:"job_run_by"` + RunDeadline int64 `db:"job_run_deadline"` + RunProgress int `db:"job_run_progress"` + LastExecuted int64 `db:"job_last_executed"` + IsRecurring bool `db:"job_is_recurring"` + RecurringCron string `db:"job_recurring_cron"` + ConsecutiveFailures int `db:"job_consecutive_failures"` + LastFailureError string `db:"job_last_failure_error"` +} + +type JobStateChange struct { + UID string `json:"uid"` + State enum.JobState `json:"state"` + Progress int `json:"progress"` + Result string `json:"result"` + Failure string `json:"failure"` +}