Merge branch 'mg/jobs/global_ctx' of _OKE5H2PQKOUfzFFDuD4FA/default/CODE/gitness (#400)

This commit is contained in:
Marko Gacesa 2023-09-05 19:26:15 +00:00 committed by Harness
commit 4a596edab2
3 changed files with 73 additions and 32 deletions

View File

@ -140,3 +140,21 @@ func (e *Executor) exec(
return exec.Handle(ctx, input, progressReporter) // runs the job
}
func DoneProgress() types.JobProgress {
return types.JobProgress{
State: enum.JobStateFinished,
Progress: ProgressMax,
Result: "",
Failure: "",
}
}
func FailProgress() types.JobProgress {
return types.JobProgress{
State: enum.JobStateFailed,
Progress: ProgressMax,
Result: "",
Failure: "",
}
}

View File

@ -36,6 +36,7 @@ type Scheduler struct {
purgeMinOldAge time.Duration
// synchronization stuff
globalCtx context.Context
signal chan time.Time
done chan struct{}
wgRunning sync.WaitGroup
@ -101,6 +102,7 @@ func (s *Scheduler) Run(ctx context.Context) error {
defer close(s.done)
s.signal = make(chan time.Time, 1)
s.globalCtx = ctx
timer := newSchedulerTimer()
defer timer.Stop()
@ -176,7 +178,7 @@ func (s *Scheduler) WaitJobsDone(ctx context.Context) {
}
}
// CancelJob cancels a currently running or scheduled job
// CancelJob cancels a currently running or scheduled job.
func (s *Scheduler) CancelJob(ctx context.Context, jobUID string) error {
mx, err := globalLock(ctx, s.mxManager)
if err != nil {
@ -309,28 +311,33 @@ func (s *Scheduler) startNewJobsNoLock(ctx context.Context, jobs []*types.Job) e
}
for _, job := range jobs {
ctx = log.Ctx(ctx).With().
Str("job.UID", job.UID).
Str("job.Type", job.Type).
Logger().WithContext(ctx)
if available > 0 {
available--
s.preExec(job) // Update the job fields for the new execution: It will be added to the DB as "running".
}
err = s.store.Create(ctx, job)
if err != nil {
return fmt.Errorf("failed to add new job to the database: %w", err)
}
if available == 0 {
// already running at the full capacity: the job is scheduled, we're done.
if job.State != enum.JobStateRunning {
continue
}
err = s.runJob(ctx, job)
if err != nil {
log.Ctx(ctx).Err(err).Msg("failed to run job")
continue
}
func(ctx context.Context) {
ctx = log.Ctx(ctx).With().
Str("job.UID", job.UID).
Str("job.Type", job.Type).
Logger().WithContext(ctx)
available--
// tell everybody that a job has started
if err := publishStateChange(ctx, s.pubsubService, job); err != nil {
log.Err(err).Msg("failed to publish job state change")
}
s.runJob(ctx, job)
}(s.globalCtx)
}
return nil
@ -394,14 +401,23 @@ func (s *Scheduler) processReadyJobs(ctx context.Context, now time.Time) (int, t
Str("job.Type", job.Type).
Logger().WithContext(ctx)
err = s.runJob(jobCtx, job)
if err != nil {
// Update the job fields for the new execution
s.preExec(job)
if err := s.store.UpdateExecution(ctx, job); err != nil {
knownNextExecTime = time.Time{}
gotAllJobs = false
log.Ctx(jobCtx).Err(err).Msg("failed to run job")
log.Ctx(jobCtx).Err(err).Msg("failed to update job to mark it as running")
continue
}
// tell everybody that a job has started
if err := publishStateChange(ctx, s.pubsubService, job); err != nil {
log.Ctx(jobCtx).Err(err).Msg("failed to publish job state change")
}
s.runJob(jobCtx, job)
countExecuted++
}
@ -424,19 +440,7 @@ func (s *Scheduler) availableSlots(ctx context.Context) (int, error) {
// runJob updates the job in the database and starts it in a separate goroutine.
// The function will also log the execution.
func (s *Scheduler) runJob(ctx context.Context, job *types.Job) error {
// Update the job fields for the new execution
s.preExec(job)
if err := s.store.UpdateExecution(ctx, job); err != nil {
return err
}
// tell everybody that a job has started
if err := publishStateChange(ctx, s.pubsubService, job); err != nil {
log.Err(err).Msg("failed to publish job state change")
}
func (s *Scheduler) runJob(ctx context.Context, j *types.Job) {
s.wgRunning.Add(1)
go func(ctx context.Context,
jobUID, jobType, jobData string,
@ -522,9 +526,7 @@ func (s *Scheduler) runJob(ctx context.Context, job *types.Job) error {
if err := publishStateChange(backgroundCtx, s.pubsubService, job); err != nil {
log.Ctx(ctx).Err(err).Msg("failed to publish job state change")
}
}(ctx, job.UID, job.Type, job.Data, job.RunDeadline)
return nil
}(ctx, j.UID, j.Type, j.Data, j.RunDeadline)
}
// preExec updates the provided types.Job before execution.
@ -641,6 +643,20 @@ func postExec(job *types.Job, resultData, resultErr string) {
}
}
func (s *Scheduler) GetJobProgress(ctx context.Context, jobUID string) (types.JobProgress, error) {
job, err := s.store.Find(ctx, jobUID)
if err != nil {
return types.JobProgress{}, err
}
return types.JobProgress{
State: job.State,
Progress: job.RunProgress,
Result: job.Result,
Failure: job.LastFailureError,
}, nil
}
func (s *Scheduler) AddRecurring(
ctx context.Context,
jobUID,

View File

@ -36,3 +36,10 @@ type JobStateChange struct {
Result string `json:"result"`
Failure string `json:"failure"`
}
type JobProgress struct {
State enum.JobState `json:"state"`
Progress int `json:"progress"`
Result string `json:"result,omitempty"`
Failure string `json:"failure,omitempty"`
}