Merge branch 'mg/jobs/initial' of _OKE5H2PQKOUfzFFDuD4FA/default/CODE/gitness (#342)

This commit is contained in:
Marko Gacesa 2023-08-29 16:29:37 +00:00 committed by Harness
commit 5a2caa9c3e
26 changed files with 2012 additions and 7 deletions

View File

@ -73,6 +73,10 @@ func (c *command) run(*kingpin.ParseContext) error {
// - ctx is canceled
g, gCtx := errgroup.WithContext(ctx)
g.Go(func() error {
return system.services.JobScheduler.Run(gCtx)
})
// start server
gHTTP, shutdownHTTP := system.server.ListenAndServe()
g.Go(gHTTP.Wait)
@ -116,6 +120,8 @@ func (c *command) run(*kingpin.ParseContext) error {
}
}
system.services.JobScheduler.WaitJobsDone(shutdownCtx)
log.Info().Msg("wait for subroutines to complete")
err = g.Wait()

View File

@ -44,6 +44,7 @@ import (
"github.com/harness/gitness/internal/server"
"github.com/harness/gitness/internal/services"
"github.com/harness/gitness/internal/services/codecomments"
"github.com/harness/gitness/internal/services/job"
pullreqservice "github.com/harness/gitness/internal/services/pullreq"
"github.com/harness/gitness/internal/services/webhook"
"github.com/harness/gitness/internal/store"
@ -102,6 +103,7 @@ func initSystem(ctx context.Context, config *types.Config) (*cliserver.System, e
lock.WireSet,
pubsub.WireSet,
codecomments.WireSet,
job.WireSet,
gitrpccron.WireSet,
checkcontroller.WireSet,
execution.WireSet,

View File

@ -42,6 +42,7 @@ import (
server2 "github.com/harness/gitness/internal/server"
"github.com/harness/gitness/internal/services"
"github.com/harness/gitness/internal/services/codecomments"
"github.com/harness/gitness/internal/services/job"
pullreq2 "github.com/harness/gitness/internal/services/pullreq"
"github.com/harness/gitness/internal/services/webhook"
"github.com/harness/gitness/internal/store"
@ -202,7 +203,13 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro
if err != nil {
return nil, err
}
servicesServices := services.ProvideServices(webhookService, pullreqService)
jobStore := database.ProvideJobStore(db)
executor := job.ProvideExecutor(jobStore, pubSub)
scheduler, err := job.ProvideScheduler(jobStore, executor, mutexManager, pubSub, config)
if err != nil {
return nil, err
}
servicesServices := services.ProvideServices(webhookService, pullreqService, executor, scheduler)
serverSystem := server.NewSystem(bootstrapBootstrap, serverServer, grpcServer, manager, servicesServices)
return serverSystem, nil
}

1
go.mod
View File

@ -18,6 +18,7 @@ require (
github.com/golang/mock v1.6.0
github.com/google/go-cmp v0.5.9
github.com/google/wire v0.5.0
github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75
github.com/gotidy/ptr v1.3.0
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/guregu/null v4.0.0+incompatible

3
go.sum
View File

@ -254,6 +254,8 @@ github.com/googleapis/enterprise-certificate-proxy v0.2.3/go.mod h1:AwSRAtLfXpU5
github.com/googleapis/gax-go/v2 v2.7.0 h1:IcsPKeInNvYi7eqSaDjiZqDDKu5rsmunY0Y1YupQSSQ=
github.com/googleapis/gax-go/v2 v2.7.0/go.mod h1:TEop28CZZQ2y+c0VxMUmu1lV+fQx57QpBWsYpwqHJx8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75 h1:f0n1xnMSmBLzVfsMMvriDyA75NB/oBgILX2GcHXIQzY=
github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75/go.mod h1:g2644b03hfBX9Ov0ZBDgXXens4rxSxmqFBbhvKv2yVA=
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
@ -367,6 +369,7 @@ github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/jmoiron/sqlx v1.3.3 h1:j82X0bf7oQ27XeqxicSZsTU5suPwKElg3oyxNn43iTk=
github.com/jmoiron/sqlx v1.3.3/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ=

View File

@ -0,0 +1,142 @@
// Copyright 2022 Harness Inc. All rights reserved.
// Use of this source code is governed by the Polyform Free Trial License
// that can be found in the LICENSE.md file for this repository.
package job
import (
"context"
"errors"
"fmt"
"runtime/debug"
"time"
"github.com/harness/gitness/internal/store"
"github.com/harness/gitness/pubsub"
"github.com/harness/gitness/types"
"github.com/harness/gitness/types/enum"
"github.com/rs/zerolog/log"
)
// Executor holds map of Handler objects per each job type registered.
// The Scheduler uses the Executor to start execution of jobs.
type Executor struct {
handlerMap map[string]Handler
handlerComplete bool
store store.JobStore
publisher pubsub.Publisher
}
const (
ProgressMin = 0
ProgressMax = 100
)
// ProgressReporter can be used by a job Handler to report back the execution progress.
type ProgressReporter func(progress int, result string) error
// Handler is a job executor for a specific job type.
// An implementation should try to honor the context and
// try to abort the execution as soon as the context is done.
type Handler interface {
Handle(ctx context.Context, input string, fn ProgressReporter) (result string, err error)
}
var noHandlerDefinedError = errors.New("no handler registered for the job type")
// NewExecutor creates new Executor.
func NewExecutor(jobStore store.JobStore, publisher pubsub.Publisher) *Executor {
return &Executor{
handlerMap: make(map[string]Handler),
handlerComplete: false,
store: jobStore,
publisher: publisher,
}
}
// Register registers a job Handler for the provided job type.
// This function is not thread safe. All calls are expected to be made
// in a single thread during the application boot time.
func (e *Executor) Register(jobType string, exec Handler) error {
if jobType == "" {
return errors.New("jobType must not be empty")
}
if e.handlerComplete {
return errors.New("job handler registration is complete")
}
if exec == nil {
return errors.New("provided Handler is nil")
}
if _, ok := e.handlerMap[jobType]; ok {
return fmt.Errorf("a Handler is already defined to run the '%s' job types", jobType)
}
e.handlerMap[jobType] = exec
return nil
}
// finishRegistration forbids further registration of job types.
// It is called by the Scheduler when it starts.
func (e *Executor) finishRegistration() {
e.handlerComplete = true
}
// exec runs a single job. This function is synchronous,
// so the caller is responsible to run it in a separate go-routine.
func (e *Executor) exec(
ctx context.Context,
jobUID, jobType string,
input string,
) (result string, err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf(
"panic while processing job=%s type=%s: %v\n%s",
jobUID, jobType, r, debug.Stack())
}
}()
exec, ok := e.handlerMap[jobType]
if !ok {
return "", noHandlerDefinedError
}
// progressReporter is the function with which the job can update its progress.
// This function will be executed in the job executor's Go-routine.
// It uses the job's context.
progressReporter := func(progress int, result string) error {
if progress < ProgressMin || progress > ProgressMax {
return errors.New("progress must be between 0 and 100")
}
jobDummy := &types.Job{
UID: jobUID,
Updated: time.Now().UnixMilli(),
Result: result,
State: enum.JobStateRunning,
RunProgress: progress,
}
// This doesn't need to be behind the global lock because it only updates the single row.
// While a job is running no other process should touch it.
// Even this call will fail if the context deadline has been exceeded.
// The job parameter is a dummy types.Job object that just holds fields that should be updated.
if err := e.store.UpdateProgress(ctx, jobDummy); err != nil {
return err
}
// tell everybody that a job progress has been updated
if err := publishStateChange(ctx, e.publisher, jobDummy); err != nil {
log.Err(err).Msg("failed to publish job state change")
}
return nil
}
return exec.Handle(ctx, input, progressReporter) // runs the job
}

View File

@ -0,0 +1,89 @@
// Copyright 2022 Harness Inc. All rights reserved.
// Use of this source code is governed by the Polyform Free Trial License
// that can be found in the LICENSE.md file for this repository.
package job
import (
"context"
"fmt"
"time"
"github.com/harness/gitness/internal/store"
"github.com/harness/gitness/lock"
"github.com/harness/gitness/types/enum"
"github.com/rs/zerolog/log"
)
const (
jobUIDOverdue = "gitness:jobs:overdue"
jobTypeOverdue = "gitness:jobs:overdue"
jobCronOverdue = "*/20 * * * *" // every 20 min
)
type jobOverdue struct {
store store.JobStore
mxManager lock.MutexManager
scheduler *Scheduler
}
func newJobOverdue(jobStore store.JobStore, mxManager lock.MutexManager, scheduler *Scheduler) *jobOverdue {
return &jobOverdue{
store: jobStore,
mxManager: mxManager,
scheduler: scheduler,
}
}
// Handle reclaims overdue jobs. Normally this shouldn't happen.
// But, it can occur if DB update after a job execution fails,
// or the server suddenly terminates while the job is still running.
func (j *jobOverdue) Handle(ctx context.Context, _ string, _ ProgressReporter) (string, error) {
mx, err := globalLock(ctx, j.mxManager)
if err != nil {
return "", fmt.Errorf("failed to obtain the lock to reclaim overdue jobs")
}
defer func() {
if err := mx.Unlock(ctx); err != nil {
log.Err(err).Msg("failed to release global lock after reclaiming overdue jobs")
}
}()
overdueJobs, err := j.store.ListDeadlineExceeded(ctx, time.Now())
if err != nil {
return "", fmt.Errorf("failed to list overdue jobs")
}
if len(overdueJobs) == 0 {
return "", nil
}
var minScheduled time.Time
for _, job := range overdueJobs {
const errorMessage = "deadline exceeded"
postExec(job, "", errorMessage)
err = j.store.UpdateExecution(ctx, job)
if err != nil {
return "", fmt.Errorf("failed update overdue job")
}
if job.State == enum.JobStateScheduled {
scheduled := time.UnixMilli(job.Scheduled)
if minScheduled.IsZero() || minScheduled.After(scheduled) {
minScheduled = scheduled
}
}
}
if !minScheduled.IsZero() {
j.scheduler.scheduleProcessing(minScheduled)
}
result := fmt.Sprintf("found %d overdue jobs", len(overdueJobs))
return result, nil
}

View File

@ -0,0 +1,67 @@
// Copyright 2022 Harness Inc. All rights reserved.
// Use of this source code is governed by the Polyform Free Trial License
// that can be found in the LICENSE.md file for this repository.
package job
import (
"context"
"fmt"
"time"
"github.com/harness/gitness/internal/store"
"github.com/harness/gitness/lock"
"github.com/rs/zerolog/log"
)
const (
jobUIDPurge = "gitness:jobs:purge"
jobTypePurge = "gitness:jobs:purge"
jobCronPurge = "15 */4 * * *" // every 4 hours at 15 minutes
)
type jobPurge struct {
store store.JobStore
mxManager lock.MutexManager
minOldAge time.Duration
}
func newJobPurge(jobStore store.JobStore, mxManager lock.MutexManager, minOldAge time.Duration) *jobPurge {
if minOldAge < 0 {
minOldAge = 0
}
return &jobPurge{
store: jobStore,
mxManager: mxManager,
minOldAge: minOldAge,
}
}
func (j *jobPurge) Handle(ctx context.Context, _ string, _ ProgressReporter) (string, error) {
mx, err := globalLock(ctx, j.mxManager)
if err != nil {
return "", fmt.Errorf("failed to obtain the lock to clean up old jobs")
}
defer func() {
if err := mx.Unlock(ctx); err != nil {
log.Err(err).Msg("failed to release global lock after cleaning up old jobs")
}
}()
olderThan := time.Now().Add(-j.minOldAge)
n, err := j.store.DeleteOld(ctx, olderThan)
if err != nil {
return "", fmt.Errorf("failed to purge old jobs")
}
result := "no old jobs found"
if n > 0 {
result = fmt.Sprintf("deleted %d old jobs", n)
}
return result, nil
}

View File

@ -0,0 +1,23 @@
// Copyright 2022 Harness Inc. All rights reserved.
// Use of this source code is governed by the Polyform Free Trial License
// that can be found in the LICENSE.md file for this repository.
package job
import (
"context"
"github.com/harness/gitness/lock"
)
func globalLock(ctx context.Context, manager lock.MutexManager) (lock.Mutex, error) {
const lockKey = "jobs"
mx, err := manager.NewMutex(lockKey)
if err != nil {
return nil, err
}
err = mx.Lock(ctx)
return mx, err
}

View File

@ -0,0 +1,60 @@
// Copyright 2022 Harness Inc. All rights reserved.
// Use of this source code is governed by the Polyform Free Trial License
// that can be found in the LICENSE.md file for this repository.
package job
import (
"bytes"
"context"
"encoding/gob"
"fmt"
"github.com/harness/gitness/pubsub"
"github.com/harness/gitness/types"
)
const (
PubSubTopicCancelJob = "gitness:job:cancel_job"
PubSubTopicStateChange = "gitness:job:state_change"
)
func encodeStateChange(job *types.Job) ([]byte, error) {
stateChange := &types.JobStateChange{
UID: job.UID,
State: job.State,
Progress: job.RunProgress,
Result: job.Result,
Failure: job.LastFailureError,
}
buffer := bytes.NewBuffer(nil)
if err := gob.NewEncoder(buffer).Encode(stateChange); err != nil {
return nil, err
}
return buffer.Bytes(), nil
}
func DecodeStateChange(payload []byte) (*types.JobStateChange, error) {
stateChange := &types.JobStateChange{}
if err := gob.NewDecoder(bytes.NewReader(payload)).Decode(stateChange); err != nil {
return nil, err
}
return stateChange, nil
}
func publishStateChange(ctx context.Context, publisher pubsub.Publisher, job *types.Job) error {
payload, err := encodeStateChange(job)
if err != nil {
return fmt.Errorf("failed to gob encode JobStateChange: %w", err)
}
err = publisher.Publish(ctx, PubSubTopicStateChange, payload)
if err != nil {
return fmt.Errorf("failed to publish JobStateChange: %w", err)
}
return nil
}

View File

@ -0,0 +1,688 @@
// Copyright 2022 Harness Inc. All rights reserved.
// Use of this source code is governed by the Polyform Free Trial License
// that can be found in the LICENSE.md file for this repository.
package job
import (
"context"
"errors"
"fmt"
"runtime/debug"
"sync"
"time"
"github.com/harness/gitness/internal/store"
"github.com/harness/gitness/lock"
"github.com/harness/gitness/pubsub"
"github.com/harness/gitness/types"
"github.com/harness/gitness/types/enum"
"github.com/gorhill/cronexpr"
"github.com/rs/zerolog/log"
)
// Scheduler controls execution of background jobs.
type Scheduler struct {
// dependencies
store store.JobStore
executor *Executor
mxManager lock.MutexManager
pubsubService pubsub.PubSub
// configuration fields
instanceID string
maxRunning int
purgeMinOldAge time.Duration
// synchronization stuff
signal chan time.Time
done chan struct{}
wgRunning sync.WaitGroup
cancelJobMx sync.Mutex
cancelJobMap map[string]context.CancelFunc
}
func NewScheduler(
jobStore store.JobStore,
executor *Executor,
mxManager lock.MutexManager,
pubsubService pubsub.PubSub,
instanceID string,
maxRunning int,
purgeMinOldAge time.Duration,
) (*Scheduler, error) {
if maxRunning < 1 {
maxRunning = 1
}
return &Scheduler{
store: jobStore,
executor: executor,
mxManager: mxManager,
pubsubService: pubsubService,
instanceID: instanceID,
maxRunning: maxRunning,
purgeMinOldAge: purgeMinOldAge,
cancelJobMap: map[string]context.CancelFunc{},
}, nil
}
// Run runs the background job scheduler.
// It's a blocking call. It blocks until the provided context is done.
func (s *Scheduler) Run(ctx context.Context) error {
if s.done != nil {
return errors.New("already started")
}
consumer := s.pubsubService.Subscribe(ctx, PubSubTopicCancelJob, s.handleCancelJob)
defer func() {
err := consumer.Close()
if err != nil {
log.Ctx(ctx).Err(err).
Msg("job scheduler: failed to close pubsub cancel job consumer")
}
}()
if err := s.createNecessaryJobs(ctx); err != nil {
return fmt.Errorf("failed to create necessary jobs: %w", err)
}
if err := s.registerNecessaryJobs(); err != nil {
return fmt.Errorf("failed to register scheduler's internal jobs: %w", err)
}
s.executor.finishRegistration()
log.Ctx(ctx).Debug().Msg("job scheduler: starting")
s.done = make(chan struct{})
defer close(s.done)
s.signal = make(chan time.Time, 1)
timer := newSchedulerTimer()
defer timer.Stop()
for {
err := func() error {
defer func() {
if r := recover(); r != nil {
stack := string(debug.Stack())
log.Ctx(ctx).Error().
Str("panic", fmt.Sprintf("[%T] job scheduler panic: %v", r, r)).
Msg(stack)
}
}()
select {
case <-ctx.Done():
return ctx.Err()
case newTime := <-s.signal:
dur := timer.RescheduleEarlier(newTime)
if dur > 0 {
log.Ctx(ctx).Trace().
Msgf("job scheduler: update of scheduled job processing time... runs in %s", dur)
}
return nil
case now := <-timer.Ch():
count, nextExec, gotAllJobs, err := s.processReadyJobs(ctx, now)
// If the next processing time isn't known use the default.
if nextExec.IsZero() {
const period = time.Minute
nextExec = now.Add(period)
}
// Reset the timer. Make the timer edgy if there are more jobs available.
dur := timer.ResetAt(nextExec, !gotAllJobs)
if err != nil {
log.Ctx(ctx).Err(err).
Msgf("job scheduler: failed to process jobs; next iteration in %s", dur)
} else {
log.Ctx(ctx).Trace().
Msgf("job scheduler: started %d jobs; next iteration in %s", count, dur)
}
return nil
}
}()
if err != nil {
return err
}
}
}
// WaitJobsDone waits until execution of all jobs has finished.
// It is intended to be used for graceful shutdown, after the Run method has finished.
func (s *Scheduler) WaitJobsDone(ctx context.Context) {
log.Ctx(ctx).Debug().Msg("job scheduler: stopping... waiting for the currently running jobs to finish")
ch := make(chan struct{})
go func() {
s.wgRunning.Wait()
close(ch)
}()
select {
case <-ctx.Done():
log.Ctx(ctx).Warn().Msg("job scheduler: stop interrupted")
case <-ch:
log.Ctx(ctx).Info().Msg("job scheduler: gracefully stopped")
}
}
func (s *Scheduler) CancelJob(ctx context.Context, jobUID string) error {
s.cancelJobMx.Lock()
cancelFn, ok := s.cancelJobMap[jobUID]
s.cancelJobMx.Unlock()
if ok {
cancelFn()
return nil
}
return s.pubsubService.Publish(ctx, PubSubTopicCancelJob, []byte(jobUID))
}
func (s *Scheduler) handleCancelJob(payload []byte) error {
jobUID := string(payload)
if jobUID == "" {
return nil
}
s.cancelJobMx.Lock()
cancelFn, ok := s.cancelJobMap[jobUID]
s.cancelJobMx.Unlock()
if ok {
cancelFn()
}
return nil
}
// scheduleProcessing triggers processing of ready jobs.
// This should be run after adding new jobs to the database.
func (s *Scheduler) scheduleProcessing(scheduled time.Time) {
go func() {
select {
case <-s.done:
case s.signal <- scheduled:
}
}()
}
// scheduleIfHaveMoreJobs triggers processing of ready jobs if the timer is edgy.
// The timer would be edgy if the previous iteration found more jobs that it could start (full capacity).
// This should be run after a non-recurring job has finished.
func (s *Scheduler) scheduleIfHaveMoreJobs() {
s.scheduleProcessing(time.Time{}) // zero time will trigger the timer if it's edgy
}
// RunJob runs a single job of the provided type.
// All parameters a job receives must be inside the data string (as JSON or whatever the job handler can interpret).
// The caller gets the job UID which can then by used to track execution (the job state and progress percentage).
func (s *Scheduler) RunJob(ctx context.Context,
jobType string,
maxRetries int,
timeout time.Duration,
data string,
) (string, error) {
if jobType == "" {
return "", errors.New("jobType must not be empty")
}
if maxRetries < 0 {
return "", errors.New("maxRetries must be positive")
}
if timeout < time.Second {
return "", errors.New("timeout too short")
}
uid, err := UID()
if err != nil {
return "", fmt.Errorf("failed to generate job UID: %w", err)
}
nowMilli := time.Now().UnixMilli()
job := &types.Job{
UID: uid,
Created: nowMilli,
Updated: nowMilli,
Type: jobType,
Priority: enum.JobPriorityNormal,
Data: data,
Result: "",
MaxDurationSeconds: int(timeout / time.Second),
MaxRetries: maxRetries,
State: enum.JobStateScheduled,
Scheduled: nowMilli,
TotalExecutions: 0,
RunBy: "",
RunDeadline: nowMilli,
RunProgress: ProgressMin,
LastExecuted: 0, // never executed
IsRecurring: false,
RecurringCron: "",
ConsecutiveFailures: 0,
LastFailureError: "",
}
ctx = log.Ctx(ctx).With().
Str("job.UID", job.UID).
Str("job.Type", job.Type).
Logger().WithContext(ctx)
mx, err := globalLock(ctx, s.mxManager)
if err != nil {
return "", fmt.Errorf("failed to obtain global lock to add new job: %w", err)
}
defer func() {
if err := mx.Unlock(ctx); err != nil {
log.Ctx(ctx).Err(err).Msg("failed to release global lock after adding a new job")
}
}()
err = s.store.Create(ctx, job)
if err != nil {
return "", fmt.Errorf("failed to add new job to the database: %w", err)
}
available, err := s.availableSlots(ctx)
if err != nil {
return "", fmt.Errorf("failed to count available slots for job execution: %w", err)
}
if available == 0 {
// already running at the full capacity: the job is scheduled, we're done.
return uid, nil
}
err = s.runJob(ctx, job)
if err != nil {
log.Ctx(ctx).Err(err).Msg("failed to run job")
}
return uid, nil
}
// processReadyJobs executes jobs that are ready to run. This function is periodically run by the Scheduler.
// The function returns the number of jobs it has is started, the next scheduled execution time (of this function)
// and a bool value if all currently available ready jobs were started.
// Internally the Scheduler uses an "edgy" timer to reschedule calls of this function.
// The edgy option of the timer will be on if this function hasn't been able to start all job that are ready to run.
// If the timer has the edgy option turned on it will trigger the timer (and thus this function will be called)
// when any currently running job finishes successfully or fails.
func (s *Scheduler) processReadyJobs(ctx context.Context, now time.Time) (int, time.Time, bool, error) {
mx, err := globalLock(ctx, s.mxManager)
if err != nil {
return 0, time.Time{}, false,
fmt.Errorf("failed to obtain global lock to periodically process ready jobs: %w", err)
}
defer func() {
if err := mx.Unlock(ctx); err != nil {
log.Ctx(ctx).Err(err).
Msg("failed to release global lock after periodic processing of ready jobs")
}
}()
availableCount, err := s.availableSlots(ctx)
if err != nil {
return 0, time.Time{}, false,
fmt.Errorf("failed to count available slots for job execution: %w", err)
}
// get one over the limit to check if all ready jobs are fetched
jobs, err := s.store.ListReady(ctx, now, availableCount+1)
if err != nil {
return 0, time.Time{}, false,
fmt.Errorf("failed to load scheduled jobs: %w", err)
}
var (
countExecuted int
knownNextExecTime time.Time
gotAllJobs bool
)
if len(jobs) > availableCount {
// More jobs are ready than we are able to run.
jobs = jobs[:availableCount]
} else {
gotAllJobs = true
knownNextExecTime, err = s.store.NextScheduledTime(ctx, now)
if err != nil {
return 0, time.Time{}, false,
fmt.Errorf("failed to read next scheduled time: %w", err)
}
}
for _, job := range jobs {
jobCtx := log.Ctx(ctx).With().
Str("job.UID", job.UID).
Str("job.Type", job.Type).
Logger().WithContext(ctx)
err = s.runJob(jobCtx, job)
if err != nil {
knownNextExecTime = time.Time{}
gotAllJobs = false
log.Ctx(jobCtx).Err(err).Msg("failed to run job")
continue
}
countExecuted++
}
return countExecuted, knownNextExecTime, gotAllJobs, nil
}
func (s *Scheduler) availableSlots(ctx context.Context) (int, error) {
countRunning, err := s.store.CountRunning(ctx)
if err != nil {
return 0, err
}
availableCount := s.maxRunning - countRunning
if availableCount < 0 {
return 0, nil
}
return availableCount, nil
}
// runJob updates the job in the database and starts it in a separate goroutine.
// The function will also log the execution.
func (s *Scheduler) runJob(ctx context.Context, job *types.Job) error {
// Update the job fields for the new execution
s.preExec(job)
if err := s.store.UpdateExecution(ctx, job); err != nil {
return err
}
// tell everybody that a job has started
if err := publishStateChange(ctx, s.pubsubService, job); err != nil {
log.Err(err).Msg("failed to publish job state change")
}
s.wgRunning.Add(1)
go func(jobCtx context.Context, job *types.Job) {
defer s.wgRunning.Done()
log.Ctx(jobCtx).Debug().Msg("started job")
timeStart := time.Now()
// Run the job
execResult, execFailure := s.doExec(jobCtx, job)
// Update the job fields, reschedule if necessary.
postExec(job, execResult, execFailure)
// Use the context.Background() because we want to update the job even if the job's context is done.
// The context can be done because the job exceeded its deadline or the server is shutting down.
backgroundCtx := context.Background()
// tell everybody that a job has finished execution
if err := publishStateChange(backgroundCtx, s.pubsubService, job); err != nil {
log.Ctx(jobCtx).Err(err).Msg("failed to publish job state change")
}
if mx, err := globalLock(backgroundCtx, s.mxManager); err != nil {
// If locking failed, just log the error and proceed to update the DB anyway.
log.Ctx(jobCtx).Err(err).Msg("failed to obtain global lock to update job after execution")
} else {
defer func() {
if err := mx.Unlock(backgroundCtx); err != nil {
log.Ctx(jobCtx).Err(err).Msg("failed to release global lock to update job after execution")
}
}()
}
if err := s.store.UpdateExecution(backgroundCtx, job); err != nil {
log.Ctx(jobCtx).Err(err).Msg("failed to update after execution")
return
}
logInfo := log.Ctx(jobCtx).Info().Str("duration", time.Since(timeStart).String())
if job.IsRecurring {
logInfo = logInfo.Bool("job.IsRecurring", true)
}
if job.Result != "" {
logInfo = logInfo.Str("job.Result", job.Result)
}
if job.LastFailureError != "" {
logInfo = logInfo.Str("job.Failure", job.LastFailureError)
}
switch job.State {
case enum.JobStateFinished:
logInfo.Msg("job successfully finished")
s.scheduleIfHaveMoreJobs()
case enum.JobStateFailed:
logInfo.Msg("job failed")
s.scheduleIfHaveMoreJobs()
case enum.JobStateScheduled:
scheduledTime := time.UnixMilli(job.Scheduled)
logInfo.
Str("job.Scheduled", scheduledTime.Format(time.RFC3339Nano)).
Msg("job finished and rescheduled")
s.scheduleProcessing(scheduledTime)
case enum.JobStateRunning:
log.Ctx(jobCtx).Error().Msg("should not happen; job still has state=running after finishing")
}
}(ctx, job)
return nil
}
// preExec updates the provided types.Job before execution.
func (s *Scheduler) preExec(job *types.Job) {
if job.MaxDurationSeconds < 1 {
job.MaxDurationSeconds = 1
}
now := time.Now()
nowMilli := now.UnixMilli()
execDuration := time.Duration(job.MaxDurationSeconds) * time.Second
execDeadline := now.Add(execDuration)
job.Updated = nowMilli
job.LastExecuted = nowMilli
job.State = enum.JobStateRunning
job.RunDeadline = execDeadline.UnixMilli()
job.RunBy = s.instanceID
job.RunProgress = ProgressMin
job.TotalExecutions++
job.Result = ""
job.LastFailureError = ""
}
// doExec executes the provided types.Job.
func (s *Scheduler) doExec(ctx context.Context, job *types.Job) (execResult, execError string) {
execDeadline := time.UnixMilli(job.RunDeadline)
jobCtx, done := context.WithDeadline(ctx, execDeadline)
defer done()
s.cancelJobMx.Lock()
if _, ok := s.cancelJobMap[job.UID]; ok {
// should not happen: jobs have unique UIDs!
s.cancelJobMx.Unlock()
return "", "failed to start: already running"
}
s.cancelJobMap[job.UID] = done
s.cancelJobMx.Unlock()
defer func() {
s.cancelJobMx.Lock()
delete(s.cancelJobMap, job.UID)
s.cancelJobMx.Unlock()
}()
execResult, err := s.executor.exec(jobCtx, job.UID, job.Type, job.Data)
if err != nil {
execError = err.Error()
}
return
}
// postExec updates the provided types.Job after execution and reschedules it if necessary.
func postExec(job *types.Job, resultData, resultErr string) {
now := time.Now()
nowMilli := now.UnixMilli()
job.Updated = nowMilli
job.Result = resultData
job.RunBy = ""
if resultErr != "" {
job.ConsecutiveFailures++
job.State = enum.JobStateFailed
job.LastFailureError = resultErr
} else {
job.State = enum.JobStateFinished
job.RunProgress = ProgressMax
}
// Reschedule recurring jobs
if job.IsRecurring {
if resultErr == "" {
job.ConsecutiveFailures = 0
}
exp, err := cronexpr.Parse(job.RecurringCron)
if err != nil {
job.State = enum.JobStateFailed
messages := fmt.Sprintf("failed to parse cron string: %s", err.Error())
if job.LastFailureError != "" {
messages = messages + "; " + job.LastFailureError
}
job.LastFailureError = messages
} else {
job.State = enum.JobStateScheduled
job.Scheduled = exp.Next(now).UnixMilli()
}
return
}
// Reschedule the failed job if retrying is allowed
if job.State == enum.JobStateFailed && job.ConsecutiveFailures <= job.MaxRetries {
const retryDelay = 15 * time.Second
job.State = enum.JobStateScheduled
job.Scheduled = now.Add(retryDelay).UnixMilli()
job.RunProgress = ProgressMin
}
}
func (s *Scheduler) AddRecurring(
ctx context.Context,
jobUID,
jobType,
cronDef string,
maxDur time.Duration,
) error {
cronExp, err := cronexpr.Parse(cronDef)
if err != nil {
return fmt.Errorf("invalid cron definition string for job type=%s: %w", jobType, err)
}
now := time.Now()
nowMilli := now.UnixMilli()
nextExec := cronExp.Next(now)
job := &types.Job{
UID: jobUID,
Created: nowMilli,
Updated: nowMilli,
Type: jobType,
Priority: enum.JobPriorityElevated,
Data: "",
Result: "",
MaxDurationSeconds: int(maxDur / time.Second),
MaxRetries: 0,
State: enum.JobStateScheduled,
Scheduled: nextExec.UnixMilli(),
TotalExecutions: 0,
RunBy: "",
RunDeadline: 0,
RunProgress: 0,
LastExecuted: 0,
IsRecurring: true,
RecurringCron: cronDef,
ConsecutiveFailures: 0,
LastFailureError: "",
}
err = s.store.Upsert(ctx, job)
if err != nil {
return fmt.Errorf("failed to upsert job id=%s type=%s: %w", jobUID, jobType, err)
}
return nil
}
func (s *Scheduler) createNecessaryJobs(ctx context.Context) error {
mx, err := globalLock(ctx, s.mxManager)
if err != nil {
return fmt.Errorf("failed to obtain global lock to create necessary jobs: %w", err)
}
defer func() {
if err := mx.Unlock(ctx); err != nil {
log.Ctx(ctx).Err(err).
Msg("failed to release global lock after creating necessary jobs")
}
}()
err = s.AddRecurring(ctx, jobUIDPurge, jobTypePurge, jobCronPurge, 5*time.Second)
if err != nil {
return err
}
err = s.AddRecurring(ctx, jobUIDOverdue, jobTypeOverdue, jobCronOverdue, 5*time.Second)
if err != nil {
return err
}
return nil
}
// registerNecessaryJobs registers two jobs: overdue job recovery and purge old finished jobs.
// These two jobs types are integral part of the job scheduler.
func (s *Scheduler) registerNecessaryJobs() error {
handlerOverdue := newJobOverdue(s.store, s.mxManager, s)
err := s.executor.Register(jobTypeOverdue, handlerOverdue)
if err != nil {
return err
}
handlerPurge := newJobPurge(s.store, s.mxManager, s.purgeMinOldAge)
err = s.executor.Register(jobTypePurge, handlerPurge)
if err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,111 @@
// Copyright 2022 Harness Inc. All rights reserved.
// Use of this source code is governed by the Polyform Free Trial License
// that can be found in the LICENSE.md file for this repository.
package job
import (
"time"
)
const timerMaxDur = 30 * time.Minute
const timerMinDur = time.Nanosecond
type schedulerTimer struct {
timerAt time.Time
timer *time.Timer
edgy bool // if true, the next RescheduleEarlier call will trigger the timer immediately.
}
// newSchedulerTimer created new timer for the Scheduler. It is created to fire immediately.
func newSchedulerTimer() *schedulerTimer {
return &schedulerTimer{
timerAt: time.Now().Add(timerMinDur),
timer: time.NewTimer(timerMinDur),
}
}
// ResetAt resets the internal timer to trigger at the provided time.
// If the provided time is zero, it will schedule it to after the max duration.
func (t *schedulerTimer) ResetAt(next time.Time, edgy bool) time.Duration {
return t.resetAt(time.Now(), next, edgy)
}
func (t *schedulerTimer) resetAt(now, next time.Time, edgy bool) time.Duration {
var dur time.Duration
dur = next.Sub(now)
if dur < timerMinDur {
dur = timerMinDur
next = now.Add(dur)
} else if dur > timerMaxDur {
dur = timerMaxDur
next = now.Add(dur)
}
t.Stop()
t.edgy = edgy
t.timerAt = next
t.timer.Reset(dur)
return dur
}
// RescheduleEarlier will reset the timer if the new time is earlier than the previous time.
// Otherwise, the function does nothing and returns 0.
// Providing zero time triggers the timer if it's edgy, otherwise does nothing.
func (t *schedulerTimer) RescheduleEarlier(next time.Time) time.Duration {
return t.rescheduleEarlier(time.Now(), next)
}
func (t *schedulerTimer) rescheduleEarlier(now, next time.Time) time.Duration {
var dur time.Duration
switch {
case t.edgy:
// if the timer is edgy trigger it immediately
dur = timerMinDur
case next.IsZero():
// if the provided time is zero: trigger the timer if it's edgy otherwise do nothing
if !t.edgy {
return 0
}
dur = timerMinDur
case !next.Before(t.timerAt):
// do nothing if the timer is already scheduled to run sooner than the provided time
return 0
default:
dur = next.Sub(now)
if dur < timerMinDur {
dur = timerMinDur
}
}
next = now.Add(dur)
t.Stop()
t.timerAt = next
t.timer.Reset(dur)
return dur
}
func (t *schedulerTimer) Ch() <-chan time.Time {
return t.timer.C
}
func (t *schedulerTimer) Stop() {
// stop the timer
t.timer.Stop()
// consume the timer's tick if any
select {
case <-t.timer.C:
default:
}
t.timerAt = time.Time{}
}

View File

@ -0,0 +1,105 @@
// Copyright 2022 Harness Inc. All rights reserved.
// Use of this source code is governed by the Polyform Free Trial License
// that can be found in the LICENSE.md file for this repository.
package job
import (
"testing"
"time"
)
func TestSchedulerTimer_ResetAt(t *testing.T) {
now := time.Now()
tests := []struct {
name string
at time.Time
exp time.Duration
}{
{
name: "zero",
at: time.Time{},
exp: timerMinDur,
},
{
name: "immediate",
at: now,
exp: timerMinDur,
},
{
name: "30s",
at: now.Add(30 * time.Second),
exp: 30 * time.Second,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
timer := newSchedulerTimer()
dur := timer.resetAt(now, test.at, false)
if want, got := test.exp, dur; want != dur {
t.Errorf("want: %s, got: %s", want.String(), got.String())
}
})
}
}
func TestSchedulerTimer_TryResetAt(t *testing.T) {
now := time.Now()
tests := []struct {
name string
at time.Time
edgy bool
exp time.Duration
}{
{
name: "past",
at: now.Add(-time.Second),
exp: timerMinDur,
},
{
name: "30s",
at: now.Add(30 * time.Second),
exp: 30 * time.Second,
},
{
name: "90s",
at: now.Add(90 * time.Second),
exp: 0,
},
{
name: "30s-edgy",
at: now.Add(30 * time.Second),
edgy: true,
exp: timerMinDur,
},
{
name: "90s-edgy",
at: now.Add(90 * time.Second),
edgy: true,
exp: timerMinDur,
},
{
name: "zero",
at: time.Time{},
exp: 0,
},
{
name: "zero-edgy",
at: time.Time{},
edgy: true,
exp: timerMinDur,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
timer := newSchedulerTimer()
timer.resetAt(now, now.Add(time.Minute), test.edgy)
dur := timer.rescheduleEarlier(now, test.at)
if want, got := test.exp, dur; want != dur {
t.Errorf("want: %s, got: %s", want.String(), got.String())
}
})
}
}

View File

@ -0,0 +1,25 @@
// Copyright 2022 Harness Inc. All rights reserved.
// Use of this source code is governed by the Polyform Free Trial License
// that can be found in the LICENSE.md file for this repository.
package job
import (
"crypto/rand"
"encoding/base32"
)
// UID returns unique random string with length equal to 16.
func UID() (string, error) {
const uidSizeBytes = 10 // must be divisible by 5, the resulting string length will be uidSizeBytes/5*8
var buf [uidSizeBytes]byte
_, err := rand.Read(buf[:])
if err != nil {
return "", err
}
uid := base32.StdEncoding.EncodeToString(buf[:])
return uid, nil
}

View File

@ -0,0 +1,47 @@
// Copyright 2022 Harness Inc. All rights reserved.
// Use of this source code is governed by the Polyform Free Trial License
// that can be found in the LICENSE.md file for this repository.
package job
import (
"github.com/harness/gitness/internal/store"
"github.com/harness/gitness/lock"
"github.com/harness/gitness/pubsub"
"github.com/harness/gitness/types"
"github.com/google/wire"
)
var WireSet = wire.NewSet(
ProvideExecutor,
ProvideScheduler,
)
func ProvideExecutor(
jobStore store.JobStore,
pubsubService pubsub.PubSub,
) *Executor {
return NewExecutor(
jobStore,
pubsubService,
)
}
func ProvideScheduler(
jobStore store.JobStore,
executor *Executor,
mutexManager lock.MutexManager,
pubsubService pubsub.PubSub,
config *types.Config,
) (*Scheduler, error) {
return NewScheduler(
jobStore,
executor,
mutexManager,
pubsubService,
config.InstanceID,
config.BackgroundJobs.MaxRunning,
config.BackgroundJobs.PurgeFinishedOlderThan,
)
}

View File

@ -5,6 +5,7 @@
package services
import (
"github.com/harness/gitness/internal/services/job"
"github.com/harness/gitness/internal/services/pullreq"
"github.com/harness/gitness/internal/services/webhook"
@ -16,16 +17,22 @@ var WireSet = wire.NewSet(
)
type Services struct {
ws *webhook.Service
bms *pullreq.Service
Webhook *webhook.Service
PullReq *pullreq.Service
JobExecutor *job.Executor
JobScheduler *job.Scheduler
}
func ProvideServices(
ws *webhook.Service,
bms *pullreq.Service,
webhooksSrv *webhook.Service,
pullReqSrv *pullreq.Service,
jobExecutor *job.Executor,
jobScheduler *job.Scheduler,
) Services {
return Services{
ws: ws,
bms: bms,
Webhook: webhooksSrv,
PullReq: pullReqSrv,
JobExecutor: jobExecutor,
JobScheduler: jobScheduler,
}
}

View File

@ -441,6 +441,42 @@ type (
Delete(ctx context.Context, repoID, reqCheckID int64) error
}
JobStore interface {
// Find fetches a job by its unique identifier.
Find(ctx context.Context, uid string) (*types.Job, error)
// Create is used to create a new job.
Create(ctx context.Context, job *types.Job) error
// Upsert will insert the job in the database if the job didn't already exist,
// or it will update the existing one but only if its definition has changed.
Upsert(ctx context.Context, job *types.Job) error
// UpdateDefinition is used to update a job definition.
UpdateDefinition(ctx context.Context, job *types.Job) error
// UpdateExecution is used to update a job before and after execution.
UpdateExecution(ctx context.Context, job *types.Job) error
// UpdateProgress is used to update a job progress data.
UpdateProgress(ctx context.Context, job *types.Job) error
// CountRunning returns number of jobs that are currently being run.
CountRunning(ctx context.Context) (int, error)
// ListReady returns a list of jobs that are ready for execution.
ListReady(ctx context.Context, now time.Time, limit int) ([]*types.Job, error)
// ListDeadlineExceeded returns a list of jobs that have exceeded their execution deadline.
ListDeadlineExceeded(ctx context.Context, now time.Time) ([]*types.Job, error)
// NextScheduledTime returns a scheduled time of the next ready job.
NextScheduledTime(ctx context.Context, now time.Time) (time.Time, error)
// DeleteOld removes non-recurring jobs that have finished execution or have failed.
DeleteOld(ctx context.Context, olderThan time.Time) (int64, error)
}
PipelineStore interface {
// Find returns a pipeline given a pipeline ID from the datastore.
Find(ctx context.Context, id int64) (*types.Pipeline, error)

View File

@ -0,0 +1,430 @@
// Copyright 2022 Harness Inc. All rights reserved.
// Use of this source code is governed by the Polyform Free Trial License
// that can be found in the LICENSE.md file for this repository.
package database
import (
"context"
"database/sql"
"errors"
"fmt"
"time"
"github.com/harness/gitness/internal/store"
gitness_store "github.com/harness/gitness/store"
"github.com/harness/gitness/store/database"
"github.com/harness/gitness/store/database/dbtx"
"github.com/harness/gitness/types"
"github.com/harness/gitness/types/enum"
"github.com/jmoiron/sqlx"
)
var _ store.JobStore = (*JobStore)(nil)
func NewJobStore(db *sqlx.DB) *JobStore {
return &JobStore{
db: db,
}
}
type JobStore struct {
db *sqlx.DB
}
const (
jobColumns = `
job_uid
,job_created
,job_updated
,job_type
,job_priority
,job_data
,job_result
,job_max_duration_seconds
,job_max_retries
,job_state
,job_scheduled
,job_total_executions
,job_run_by
,job_run_deadline
,job_run_progress
,job_last_executed
,job_is_recurring
,job_recurring_cron
,job_consecutive_failures
,job_last_failure_error`
jobSelectBase = `
SELECT` + jobColumns + `
FROM jobs`
)
// Find fetches a job by its unique identifier.
func (s *JobStore) Find(ctx context.Context, uid string) (*types.Job, error) {
const sqlQuery = jobSelectBase + `
WHERE job_uid = $1`
db := dbtx.GetAccessor(ctx, s.db)
result := &types.Job{}
if err := db.GetContext(ctx, result, sqlQuery, uid); err != nil {
return nil, database.ProcessSQLErrorf(err, "Failed to find job by uid")
}
return result, nil
}
// Create creates a new job.
func (s *JobStore) Create(ctx context.Context, job *types.Job) error {
const sqlQuery = `
INSERT INTO jobs (` + jobColumns + `
) VALUES (
:job_uid
,:job_created
,:job_updated
,:job_type
,:job_priority
,:job_data
,:job_result
,:job_max_duration_seconds
,:job_max_retries
,:job_state
,:job_scheduled
,:job_total_executions
,:job_run_by
,:job_run_deadline
,:job_run_progress
,:job_last_executed
,:job_is_recurring
,:job_recurring_cron
,:job_consecutive_failures
,:job_last_failure_error
)`
db := dbtx.GetAccessor(ctx, s.db)
query, arg, err := db.BindNamed(sqlQuery, job)
if err != nil {
return database.ProcessSQLErrorf(err, "Failed to bind job object")
}
if _, err := db.ExecContext(ctx, query, arg...); err != nil {
return database.ProcessSQLErrorf(err, "Insert query failed")
}
return nil
}
// Upsert creates or updates a job. If the job didn't exist it will insert it in the database,
// otherwise it will update it but only if its definition has changed.
func (s *JobStore) Upsert(ctx context.Context, job *types.Job) error {
const sqlQuery = `
INSERT INTO jobs (` + jobColumns + `
) VALUES (
:job_uid
,:job_created
,:job_updated
,:job_type
,:job_priority
,:job_data
,:job_result
,:job_max_duration_seconds
,:job_max_retries
,:job_state
,:job_scheduled
,:job_total_executions
,:job_run_by
,:job_run_deadline
,:job_run_progress
,:job_last_executed
,:job_is_recurring
,:job_recurring_cron
,:job_consecutive_failures
,:job_last_failure_error
)
ON CONFLICT (job_uid) DO
UPDATE SET
job_updated = :job_updated
,job_type = :job_type
,job_priority = :job_priority
,job_data = :job_data
,job_result = :job_result
,job_max_duration_seconds = :job_max_duration_seconds
,job_max_retries = :job_max_retries
,job_state = :job_state
,job_scheduled = :job_scheduled
,job_is_recurring = :job_is_recurring
,job_recurring_cron = :job_recurring_cron
WHERE
jobs.job_type <> :job_type OR
jobs.job_priority <> :job_priority OR
jobs.job_data <> :job_data OR
jobs.job_max_duration_seconds <> :job_max_duration_seconds OR
jobs.job_max_retries <> :job_max_retries OR
jobs.job_is_recurring <> :job_is_recurring OR
jobs.job_recurring_cron <> :job_recurring_cron`
db := dbtx.GetAccessor(ctx, s.db)
query, arg, err := db.BindNamed(sqlQuery, job)
if err != nil {
return database.ProcessSQLErrorf(err, "Failed to bind job object")
}
if _, err := db.ExecContext(ctx, query, arg...); err != nil {
return database.ProcessSQLErrorf(err, "Upsert query failed")
}
return nil
}
// UpdateDefinition is used to update a job definition.
func (s *JobStore) UpdateDefinition(ctx context.Context, job *types.Job) error {
const sqlQuery = `
UPDATE jobs
SET
job_updated = :job_updated
,job_type = :job_type
,job_priority = :job_priority
,job_data = :job_data
,job_result = :job_result
,job_max_duration_seconds = :job_max_duration_seconds
,job_max_retries = :job_max_retries
,job_state = :job_state
,job_scheduled = :job_scheduled
,job_is_recurring = :job_is_recurring
,job_recurring_cron = :job_recurring_cron
WHERE job_uid = :job_uid`
db := dbtx.GetAccessor(ctx, s.db)
query, arg, err := db.BindNamed(sqlQuery, job)
if err != nil {
return database.ProcessSQLErrorf(err, "Failed to bind job object for update")
}
result, err := db.ExecContext(ctx, query, arg...)
if err != nil {
return database.ProcessSQLErrorf(err, "Failed to update job definition")
}
count, err := result.RowsAffected()
if err != nil {
return database.ProcessSQLErrorf(err, "Failed to get number of updated rows")
}
if count == 0 {
return gitness_store.ErrResourceNotFound
}
return nil
}
// UpdateExecution is used to update a job before and after execution.
func (s *JobStore) UpdateExecution(ctx context.Context, job *types.Job) error {
const sqlQuery = `
UPDATE jobs
SET
job_updated = :job_updated
,job_result = :job_result
,job_state = :job_state
,job_scheduled = :job_scheduled
,job_total_executions = :job_total_executions
,job_run_by = :job_run_by
,job_run_deadline = :job_run_deadline
,job_last_executed = :job_last_executed
,job_consecutive_failures = :job_consecutive_failures
,job_last_failure_error = :job_last_failure_error
WHERE job_uid = :job_uid`
db := dbtx.GetAccessor(ctx, s.db)
query, arg, err := db.BindNamed(sqlQuery, job)
if err != nil {
return database.ProcessSQLErrorf(err, "Failed to bind job object for update")
}
result, err := db.ExecContext(ctx, query, arg...)
if err != nil {
return database.ProcessSQLErrorf(err, "Failed to update job execution")
}
count, err := result.RowsAffected()
if err != nil {
return database.ProcessSQLErrorf(err, "Failed to get number of updated rows")
}
if count == 0 {
return gitness_store.ErrResourceNotFound
}
return nil
}
func (s *JobStore) UpdateProgress(ctx context.Context, job *types.Job) error {
const sqlQuery = `
UPDATE jobs
SET
job_updated = :job_updated
,job_result = :job_result
,job_run_progress = :job_run_progress
WHERE job_uid = :job_uid AND job_state = 'running'`
db := dbtx.GetAccessor(ctx, s.db)
query, arg, err := db.BindNamed(sqlQuery, job)
if err != nil {
return database.ProcessSQLErrorf(err, "Failed to bind job object for update")
}
result, err := db.ExecContext(ctx, query, arg...)
if err != nil {
return database.ProcessSQLErrorf(err, "Failed to update job progress")
}
count, err := result.RowsAffected()
if err != nil {
return database.ProcessSQLErrorf(err, "Failed to get number of updated rows")
}
if count == 0 {
return gitness_store.ErrResourceNotFound
}
return nil
}
// CountRunning returns number of jobs that are currently being run.
func (s *JobStore) CountRunning(ctx context.Context) (int, error) {
stmt := database.Builder.
Select("count(*)").
From("jobs").
Where("job_state = ?", enum.JobStateRunning)
sql, args, err := stmt.ToSql()
if err != nil {
return 0, fmt.Errorf("failed to convert count running jobs query to sql: %w", err)
}
db := dbtx.GetAccessor(ctx, s.db)
var count int64
err = db.QueryRowContext(ctx, sql, args...).Scan(&count)
if err != nil {
return 0, database.ProcessSQLErrorf(err, "failed executing count running jobs query")
}
return int(count), nil
}
// ListReady returns a list of jobs that are ready for execution:
// The jobs with state="scheduled" and scheduled time in the past.
func (s *JobStore) ListReady(ctx context.Context, now time.Time, limit int) ([]*types.Job, error) {
stmt := database.Builder.
Select(jobColumns).
From("jobs").
Where("job_state = ?", enum.JobStateScheduled).
Where("job_scheduled <= ?", now.UnixMilli()).
OrderBy("job_priority desc, job_scheduled asc, job_uid asc").
Limit(uint64(limit))
sql, args, err := stmt.ToSql()
if err != nil {
return nil, fmt.Errorf("failed to convert list scheduled jobs query to sql: %w", err)
}
result := make([]*types.Job, 0)
db := dbtx.GetAccessor(ctx, s.db)
if err = db.SelectContext(ctx, &result, sql, args...); err != nil {
return nil, database.ProcessSQLErrorf(err, "failed to execute list scheduled jobs query")
}
return result, nil
}
// ListDeadlineExceeded returns a list of jobs that have exceeded their execution deadline.
func (s *JobStore) ListDeadlineExceeded(ctx context.Context, now time.Time) ([]*types.Job, error) {
stmt := database.Builder.
Select(jobColumns).
From("jobs").
Where("job_state = ?", enum.JobStateRunning).
Where("job_run_deadline < ?", now.UnixMilli()).
OrderBy("job_run_deadline asc")
sql, args, err := stmt.ToSql()
if err != nil {
return nil, fmt.Errorf("failed to convert list overdue jobs query to sql: %w", err)
}
result := make([]*types.Job, 0)
db := dbtx.GetAccessor(ctx, s.db)
if err = db.SelectContext(ctx, &result, sql, args...); err != nil {
return nil, database.ProcessSQLErrorf(err, "failed to execute list overdue jobs query")
}
return result, nil
}
// NextScheduledTime returns a scheduled time of the next ready job or zero time if no such job exists.
func (s *JobStore) NextScheduledTime(ctx context.Context, now time.Time) (time.Time, error) {
stmt := database.Builder.
Select("job_scheduled").
From("jobs").
Where("job_state = ?", enum.JobStateScheduled).
Where("job_scheduled > ?", now.UnixMilli()).
OrderBy("job_scheduled asc").
Limit(1)
query, args, err := stmt.ToSql()
if err != nil {
return time.Time{}, fmt.Errorf("failed to convert next scheduled time query to sql: %w", err)
}
db := dbtx.GetAccessor(ctx, s.db)
var result int64
err = db.QueryRowContext(ctx, query, args...).Scan(&result)
if errors.Is(err, sql.ErrNoRows) {
return time.Time{}, nil
}
if err != nil {
return time.Time{}, database.ProcessSQLErrorf(err, "failed to execute next scheduled time query")
}
return time.UnixMilli(result), nil
}
// DeleteOld removes non-recurring jobs that have finished execution or have failed.
func (s *JobStore) DeleteOld(ctx context.Context, olderThan time.Time) (int64, error) {
stmt := database.Builder.
Delete("jobs").
Where("(job_state = ? OR job_state = ?)", enum.JobStateFinished, enum.JobStateFailed).
Where("job_is_recurring = false").
Where("job_last_executed < ?", olderThan.UnixMilli())
sql, args, err := stmt.ToSql()
if err != nil {
return 0, fmt.Errorf("failed to convert delete done jobs query to sql: %w", err)
}
db := dbtx.GetAccessor(ctx, s.db)
result, err := db.ExecContext(ctx, sql, args...)
if err != nil {
return 0, database.ProcessSQLErrorf(err, "failed to execute delete done jobs query")
}
n, err := result.RowsAffected()
if err != nil {
return 0, database.ProcessSQLErrorf(err, "failed to get number of deleted jobs")
}
return n, nil
}

View File

@ -0,0 +1,4 @@
DROP INDEX jobs_last_executed;
DROP INDEX jobs_run_deadline;
DROP INDEX jobs_scheduled;
DROP TABLE jobs;

View File

@ -0,0 +1,35 @@
CREATE TABLE jobs (
job_uid TEXT NOT NULL
,job_created BIGINT NOT NULL
,job_updated BIGINT NOT NULL
,job_type TEXT NOT NULL
,job_priority INTEGER NOT NULL
,job_data TEXT NOT NULL
,job_result TEXT NOT NULL
,job_max_duration_seconds INTEGER NOT NULL
,job_max_retries INTEGER NOT NULL
,job_state TEXT NOT NULL
,job_scheduled BIGINT NOT NULL
,job_total_executions INTEGER
,job_run_by TEXT NOT NULL
,job_run_deadline BIGINT
,job_run_progress INTEGER NOT NULL
,job_last_executed BIGINT
,job_is_recurring BOOLEAN NOT NULL
,job_recurring_cron TEXT NOT NULL
,job_consecutive_failures INTEGER NOT NULL
,job_last_failure_error TEXT NOT NULL
,CONSTRAINT pk_jobs_uid PRIMARY KEY (job_uid)
);
CREATE INDEX jobs_scheduled
ON jobs(job_scheduled)
WHERE job_state = 'scheduled';
CREATE INDEX jobs_run_deadline
ON jobs(job_run_deadline)
WHERE job_state = 'running';
CREATE INDEX jobs_last_executed
ON jobs(job_last_executed)
WHERE job_state = 'finished' OR job_state = 'failed';

View File

@ -0,0 +1,4 @@
DROP INDEX jobs_last_executed;
DROP INDEX jobs_run_deadline;
DROP INDEX jobs_scheduled;
DROP TABLE jobs;

View File

@ -0,0 +1,35 @@
CREATE TABLE jobs (
job_uid TEXT NOT NULL
,job_created BIGINT NOT NULL
,job_updated BIGINT NOT NULL
,job_type TEXT NOT NULL
,job_priority INTEGER NOT NULL
,job_data TEXT NOT NULL
,job_result TEXT NOT NULL
,job_max_duration_seconds INTEGER NOT NULL
,job_max_retries INTEGER NOT NULL
,job_state TEXT NOT NULL
,job_scheduled BIGINT NOT NULL
,job_total_executions INTEGER
,job_run_by TEXT NOT NULL
,job_run_deadline BIGINT
,job_run_progress INTEGER NOT NULL
,job_last_executed BIGINT
,job_is_recurring BOOLEAN NOT NULL
,job_recurring_cron TEXT NOT NULL
,job_consecutive_failures INTEGER NOT NULL
,job_last_failure_error TEXT NOT NULL
,CONSTRAINT pk_jobs_uid PRIMARY KEY (job_uid)
);
CREATE INDEX jobs_scheduled
ON jobs(job_scheduled)
WHERE job_state = 'scheduled';
CREATE INDEX jobs_run_deadline
ON jobs(job_run_deadline)
WHERE job_state = 'running';
CREATE INDEX jobs_last_executed
ON jobs(job_last_executed)
WHERE job_state = 'finished' OR job_state = 'failed';

View File

@ -23,6 +23,7 @@ var WireSet = wire.NewSet(
ProvidePathStore,
ProvideSpaceStore,
ProvideRepoStore,
ProvideJobStore,
ProvideExecutionStore,
ProvidePipelineStore,
ProvideStageStore,
@ -87,6 +88,11 @@ func ProvideRepoStore(db *sqlx.DB, pathCache store.PathCache) store.RepoStore {
return NewRepoStore(db, pathCache)
}
// ProvideJobStore provides a job store.
func ProvideJobStore(db *sqlx.DB) store.JobStore {
return NewJobStore(db)
}
// ProvidePipelineStore provides a pipeline store.
func ProvidePipelineStore(db *sqlx.DB) store.PipelineStore {
return NewPipelineStore(db)

View File

@ -180,4 +180,13 @@ type Config struct {
SendTimeout time.Duration `envconfig:"GITNESS_PUBSUB_SEND_TIMEOUT" default:"60s"`
ChannelSize int `envconfig:"GITNESS_PUBSUB_CHANNEL_SIZE" default:"100"`
}
BackgroundJobs struct {
// MaxRunning is maximum number of jobs that can be running at once.
MaxRunning int `envconfig:"GITNESS_JOBS_MAX_RUNNING" default:"10"`
// PurgeFinishedOlderThan is duration after non-recurring,
// finished and failed jobs will be purged from the DB.
PurgeFinishedOlderThan time.Duration `envconfig:"GITNESS_JOBS_PURGE_FINISHED_OLDER_THAN" default:"120h"`
}
}

25
types/enum/job.go Normal file
View File

@ -0,0 +1,25 @@
// Copyright 2022 Harness Inc. All rights reserved.
// Use of this source code is governed by the Polyform Free Trial License
// that can be found in the LICENSE.md file for this repository.
package enum
// JobState represents state of a background job.
type JobState string
// JobState enumeration.
const (
JobStateScheduled JobState = "scheduled"
JobStateRunning JobState = "running"
JobStateFinished JobState = "finished"
JobStateFailed JobState = "failed"
)
// JobPriority represents priority of a background job.
type JobPriority int
// JobPriority enumeration.
const (
JobPriorityNormal JobPriority = 0
JobPriorityElevated JobPriority = 1
)

38
types/job.go Normal file
View File

@ -0,0 +1,38 @@
// Copyright 2022 Harness Inc. All rights reserved.
// Use of this source code is governed by the Polyform Free Trial License
// that can be found in the LICENSE.md file for this repository.
package types
import "github.com/harness/gitness/types/enum"
type Job struct {
UID string `db:"job_uid"`
Created int64 `db:"job_created"`
Updated int64 `db:"job_updated"`
Type string `db:"job_type"`
Priority enum.JobPriority `db:"job_priority"`
Data string `db:"job_data"`
Result string `db:"job_result"`
MaxDurationSeconds int `db:"job_max_duration_seconds"`
MaxRetries int `db:"job_max_retries"`
State enum.JobState `db:"job_state"`
Scheduled int64 `db:"job_scheduled"`
TotalExecutions int `db:"job_total_executions"`
RunBy string `db:"job_run_by"`
RunDeadline int64 `db:"job_run_deadline"`
RunProgress int `db:"job_run_progress"`
LastExecuted int64 `db:"job_last_executed"`
IsRecurring bool `db:"job_is_recurring"`
RecurringCron string `db:"job_recurring_cron"`
ConsecutiveFailures int `db:"job_consecutive_failures"`
LastFailureError string `db:"job_last_failure_error"`
}
type JobStateChange struct {
UID string `json:"uid"`
State enum.JobState `json:"state"`
Progress int `json:"progress"`
Result string `json:"result"`
Failure string `json:"failure"`
}