Merge branch 'mg/jobs/start_refactor' of _OKE5H2PQKOUfzFFDuD4FA/default/CODE/gitness (#396)

This commit is contained in:
Marko Gacesa 2023-09-04 09:22:04 +00:00 committed by Harness
commit d1b5128d63
2 changed files with 119 additions and 67 deletions

View File

@ -0,0 +1,67 @@
// Copyright 2022 Harness Inc. All rights reserved.
// Use of this source code is governed by the Polyform Free Trial License
// that can be found in the LICENSE.md file for this repository.
package job
import (
"errors"
"time"
"github.com/harness/gitness/types"
"github.com/harness/gitness/types/enum"
)
type Definition struct {
UID string
Type string
MaxRetries int
Timeout time.Duration
Data string
}
func (def *Definition) Validate() error {
if def.Type == "" {
return errors.New("job Type must not be empty")
}
if def.UID == "" {
return errors.New("job must have unique identifier")
}
if def.MaxRetries < 0 {
return errors.New("job MaxRetries must be positive")
}
if def.Timeout < time.Second {
return errors.New("job Timeout too short")
}
return nil
}
func (def *Definition) toNewJob() *types.Job {
nowMilli := time.Now().UnixMilli()
return &types.Job{
UID: def.UID,
Created: nowMilli,
Updated: nowMilli,
Type: def.Type,
Priority: enum.JobPriorityNormal,
Data: def.Data,
Result: "",
MaxDurationSeconds: int(def.Timeout / time.Second),
MaxRetries: def.MaxRetries,
State: enum.JobStateScheduled,
Scheduled: nowMilli,
TotalExecutions: 0,
RunBy: "",
RunDeadline: nowMilli,
RunProgress: ProgressMin,
LastExecuted: 0, // never executed
IsRecurring: false,
RecurringCron: "",
ConsecutiveFailures: 0,
LastFailureError: "",
}
}

View File

@ -261,94 +261,79 @@ func (s *Scheduler) scheduleIfHaveMoreJobs() {
s.scheduleProcessing(time.Time{}) // zero time will trigger the timer if it's edgy
}
// RunJob runs a single job of the provided type.
// All parameters a job receives must be inside the data string (as JSON or whatever the job handler can interpret).
// The caller gets the job UID which can then by used to track execution (the job state and progress percentage).
func (s *Scheduler) RunJob(ctx context.Context,
jobType string,
maxRetries int,
timeout time.Duration,
data string,
) (string, error) {
if jobType == "" {
return "", errors.New("jobType must not be empty")
// RunJob runs a single job of the type Definition.Type.
// All parameters a job Handler receives must be inside the Definition.Data string
// (as JSON or whatever the job Handler can interpret).
func (s *Scheduler) RunJob(ctx context.Context, def Definition) error {
if err := def.Validate(); err != nil {
return err
}
if maxRetries < 0 {
return "", errors.New("maxRetries must be positive")
return s.startNewJobs(ctx, []*types.Job{def.toNewJob()})
}
// RunJobs runs a several jobs. It's more efficient than calling RunJob several times
// because it locks the DB only once.
// TODO: Add groupID parameter and use it for all jobs.
func (s *Scheduler) RunJobs(ctx context.Context, defs []Definition) error {
jobs := make([]*types.Job, len(defs))
for i, def := range defs {
if err := def.Validate(); err != nil {
return err
}
jobs[i] = def.toNewJob()
}
if timeout < time.Second {
return "", errors.New("timeout too short")
}
uid, err := UID()
if err != nil {
return "", fmt.Errorf("failed to generate job UID: %w", err)
}
nowMilli := time.Now().UnixMilli()
job := &types.Job{
UID: uid,
Created: nowMilli,
Updated: nowMilli,
Type: jobType,
Priority: enum.JobPriorityNormal,
Data: data,
Result: "",
MaxDurationSeconds: int(timeout / time.Second),
MaxRetries: maxRetries,
State: enum.JobStateScheduled,
Scheduled: nowMilli,
TotalExecutions: 0,
RunBy: "",
RunDeadline: nowMilli,
RunProgress: ProgressMin,
LastExecuted: 0, // never executed
IsRecurring: false,
RecurringCron: "",
ConsecutiveFailures: 0,
LastFailureError: "",
}
ctx = log.Ctx(ctx).With().
Str("job.UID", job.UID).
Str("job.Type", job.Type).
Logger().WithContext(ctx)
return s.startNewJobs(ctx, jobs)
}
func (s *Scheduler) startNewJobs(ctx context.Context, jobs []*types.Job) error {
mx, err := globalLock(ctx, s.mxManager)
if err != nil {
return "", fmt.Errorf("failed to obtain global lock to add new job: %w", err)
return fmt.Errorf("failed to obtain global lock to start new jobs: %w", err)
}
defer func() {
if err := mx.Unlock(ctx); err != nil {
log.Ctx(ctx).Err(err).Msg("failed to release global lock after adding a new job")
log.Ctx(ctx).Err(err).Msg("failed to release global lock after starting new jobs")
}
}()
err = s.store.Create(ctx, job)
if err != nil {
return "", fmt.Errorf("failed to add new job to the database: %w", err)
}
return s.startNewJobsNoLock(ctx, jobs)
}
func (s *Scheduler) startNewJobsNoLock(ctx context.Context, jobs []*types.Job) error {
available, err := s.availableSlots(ctx)
if err != nil {
return "", fmt.Errorf("failed to count available slots for job execution: %w", err)
return fmt.Errorf("failed to count available slots for job execution: %w", err)
}
if available == 0 {
// already running at the full capacity: the job is scheduled, we're done.
return uid, nil
for _, job := range jobs {
ctx = log.Ctx(ctx).With().
Str("job.UID", job.UID).
Str("job.Type", job.Type).
Logger().WithContext(ctx)
err = s.store.Create(ctx, job)
if err != nil {
return fmt.Errorf("failed to add new job to the database: %w", err)
}
if available == 0 {
// already running at the full capacity: the job is scheduled, we're done.
continue
}
err = s.runJob(ctx, job)
if err != nil {
log.Ctx(ctx).Err(err).Msg("failed to run job")
continue
}
available--
}
err = s.runJob(ctx, job)
if err != nil {
log.Ctx(ctx).Err(err).Msg("failed to run job")
}
return uid, nil
return nil
}
// processReadyJobs executes jobs that are ready to run. This function is periodically run by the Scheduler.