diff --git a/internal/services/job/executor.go b/internal/services/job/executor.go index 7f80917d3..1b0cb6849 100644 --- a/internal/services/job/executor.go +++ b/internal/services/job/executor.go @@ -140,3 +140,21 @@ func (e *Executor) exec( return exec.Handle(ctx, input, progressReporter) // runs the job } + +func DoneProgress() types.JobProgress { + return types.JobProgress{ + State: enum.JobStateFinished, + Progress: ProgressMax, + Result: "", + Failure: "", + } +} + +func FailProgress() types.JobProgress { + return types.JobProgress{ + State: enum.JobStateFailed, + Progress: ProgressMax, + Result: "", + Failure: "", + } +} diff --git a/internal/services/job/scheduler.go b/internal/services/job/scheduler.go index b8d402a82..d82afae38 100644 --- a/internal/services/job/scheduler.go +++ b/internal/services/job/scheduler.go @@ -36,6 +36,7 @@ type Scheduler struct { purgeMinOldAge time.Duration // synchronization stuff + globalCtx context.Context signal chan time.Time done chan struct{} wgRunning sync.WaitGroup @@ -101,6 +102,7 @@ func (s *Scheduler) Run(ctx context.Context) error { defer close(s.done) s.signal = make(chan time.Time, 1) + s.globalCtx = ctx timer := newSchedulerTimer() defer timer.Stop() @@ -176,7 +178,7 @@ func (s *Scheduler) WaitJobsDone(ctx context.Context) { } } -// CancelJob cancels a currently running or scheduled job +// CancelJob cancels a currently running or scheduled job. func (s *Scheduler) CancelJob(ctx context.Context, jobUID string) error { mx, err := globalLock(ctx, s.mxManager) if err != nil { @@ -309,28 +311,33 @@ func (s *Scheduler) startNewJobsNoLock(ctx context.Context, jobs []*types.Job) e } for _, job := range jobs { - ctx = log.Ctx(ctx).With(). - Str("job.UID", job.UID). - Str("job.Type", job.Type). - Logger().WithContext(ctx) + if available > 0 { + available-- + s.preExec(job) // Update the job fields for the new execution: It will be added to the DB as "running". + } err = s.store.Create(ctx, job) if err != nil { return fmt.Errorf("failed to add new job to the database: %w", err) } - if available == 0 { - // already running at the full capacity: the job is scheduled, we're done. + if job.State != enum.JobStateRunning { continue } - err = s.runJob(ctx, job) - if err != nil { - log.Ctx(ctx).Err(err).Msg("failed to run job") - continue - } + func(ctx context.Context) { + ctx = log.Ctx(ctx).With(). + Str("job.UID", job.UID). + Str("job.Type", job.Type). + Logger().WithContext(ctx) - available-- + // tell everybody that a job has started + if err := publishStateChange(ctx, s.pubsubService, job); err != nil { + log.Err(err).Msg("failed to publish job state change") + } + + s.runJob(ctx, job) + }(s.globalCtx) } return nil @@ -394,14 +401,23 @@ func (s *Scheduler) processReadyJobs(ctx context.Context, now time.Time) (int, t Str("job.Type", job.Type). Logger().WithContext(ctx) - err = s.runJob(jobCtx, job) - if err != nil { + // Update the job fields for the new execution + s.preExec(job) + + if err := s.store.UpdateExecution(ctx, job); err != nil { knownNextExecTime = time.Time{} gotAllJobs = false - log.Ctx(jobCtx).Err(err).Msg("failed to run job") + log.Ctx(jobCtx).Err(err).Msg("failed to update job to mark it as running") continue } + // tell everybody that a job has started + if err := publishStateChange(ctx, s.pubsubService, job); err != nil { + log.Ctx(jobCtx).Err(err).Msg("failed to publish job state change") + } + + s.runJob(jobCtx, job) + countExecuted++ } @@ -424,19 +440,7 @@ func (s *Scheduler) availableSlots(ctx context.Context) (int, error) { // runJob updates the job in the database and starts it in a separate goroutine. // The function will also log the execution. -func (s *Scheduler) runJob(ctx context.Context, job *types.Job) error { - // Update the job fields for the new execution - s.preExec(job) - - if err := s.store.UpdateExecution(ctx, job); err != nil { - return err - } - - // tell everybody that a job has started - if err := publishStateChange(ctx, s.pubsubService, job); err != nil { - log.Err(err).Msg("failed to publish job state change") - } - +func (s *Scheduler) runJob(ctx context.Context, j *types.Job) { s.wgRunning.Add(1) go func(ctx context.Context, jobUID, jobType, jobData string, @@ -522,9 +526,7 @@ func (s *Scheduler) runJob(ctx context.Context, job *types.Job) error { if err := publishStateChange(backgroundCtx, s.pubsubService, job); err != nil { log.Ctx(ctx).Err(err).Msg("failed to publish job state change") } - }(ctx, job.UID, job.Type, job.Data, job.RunDeadline) - - return nil + }(ctx, j.UID, j.Type, j.Data, j.RunDeadline) } // preExec updates the provided types.Job before execution. @@ -641,6 +643,20 @@ func postExec(job *types.Job, resultData, resultErr string) { } } +func (s *Scheduler) GetJobProgress(ctx context.Context, jobUID string) (types.JobProgress, error) { + job, err := s.store.Find(ctx, jobUID) + if err != nil { + return types.JobProgress{}, err + } + + return types.JobProgress{ + State: job.State, + Progress: job.RunProgress, + Result: job.Result, + Failure: job.LastFailureError, + }, nil +} + func (s *Scheduler) AddRecurring( ctx context.Context, jobUID, diff --git a/types/job.go b/types/job.go index e5ddb9cee..37ae01c50 100644 --- a/types/job.go +++ b/types/job.go @@ -36,3 +36,10 @@ type JobStateChange struct { Result string `json:"result"` Failure string `json:"failure"` } + +type JobProgress struct { + State enum.JobState `json:"state"` + Progress int `json:"progress"` + Result string `json:"result,omitempty"` + Failure string `json:"failure,omitempty"` +}