drone/internal/pipeline/manager/teardown.go

345 lines
8.2 KiB
Go

// Copyright 2022 Harness Inc. All rights reserved.
// Use of this source code is governed by the Polyform Free Trial License
// that can be found in the LICENSE.md file for this repository.
package manager
import (
"context"
"strings"
"time"
"github.com/harness/gitness/internal/pipeline/checks"
"github.com/harness/gitness/internal/pipeline/scheduler"
"github.com/harness/gitness/internal/sse"
"github.com/harness/gitness/internal/store"
"github.com/harness/gitness/livelog"
gitness_store "github.com/harness/gitness/store"
"github.com/harness/gitness/types"
"github.com/harness/gitness/types/enum"
"github.com/hashicorp/go-multierror"
"github.com/rs/zerolog/log"
)
type teardown struct {
Executions store.ExecutionStore
Checks store.CheckStore
Pipelines store.PipelineStore
SSEStreamer sse.Streamer
Logs livelog.LogStream
Scheduler scheduler.Scheduler
Repos store.RepoStore
Steps store.StepStore
Stages store.StageStore
}
func (t *teardown) do(ctx context.Context, stage *types.Stage) error {
log := log.With().
Int64("stage.id", stage.ID).
Logger()
log.Debug().Msg("manager: stage is complete. teardown")
execution, err := t.Executions.Find(noContext, stage.ExecutionID)
if err != nil {
log.Error().Err(err).Msg("manager: cannot find the execution")
return err
}
log = log.With().
Int64("execution.number", execution.Number).
Int64("execution.id", execution.ID).
Int64("repo.id", execution.RepoID).
Str("stage.status", string(stage.Status)).
Logger()
repo, err := t.Repos.Find(noContext, execution.RepoID)
if err != nil {
log.Error().Err(err).Msg("manager: cannot find the repository")
return err
}
for _, step := range stage.Steps {
if len(step.Error) > 500 {
step.Error = step.Error[:500]
}
err := t.Steps.Update(noContext, step)
if err != nil {
log = log.With().
Str("step.name", step.Name).
Int64("step.id", step.ID).
Err(err).
Logger()
log.Error().Msg("manager: cannot persist the step")
return err
}
}
if len(stage.Error) > 500 {
stage.Error = stage.Error[:500]
}
err = t.Stages.Update(noContext, stage)
if err != nil {
log.Error().Err(err).
Msg("manager: cannot update the stage")
return err
}
for _, step := range stage.Steps {
t.Logs.Delete(noContext, step.ID)
}
stages, err := t.Stages.ListWithSteps(noContext, execution.ID)
if err != nil {
log.Warn().Err(err).
Msg("manager: cannot get stages")
return err
}
err = t.cancelDownstream(ctx, stages)
if err != nil {
log.Error().Err(err).
Msg("manager: cannot cancel downstream builds")
return err
}
err = t.scheduleDownstream(ctx, stage, stages)
if err != nil {
log.Error().Err(err).
Msg("manager: cannot schedule downstream builds")
return err
}
if !isexecutionComplete(stages) {
log.Warn().Err(err).
Msg("manager: execution pending completion of additional stages")
return nil
}
log.Info().Msg("manager: execution is finished, teardown")
execution.Status = enum.CIStatusSuccess
execution.Finished = time.Now().UnixMilli()
for _, sibling := range stages {
if sibling.Status == enum.CIStatusKilled {
execution.Status = enum.CIStatusKilled
break
}
if sibling.Status == enum.CIStatusFailure {
execution.Status = enum.CIStatusFailure
break
}
if sibling.Status == enum.CIStatusError {
execution.Status = enum.CIStatusError
break
}
}
if execution.Started == 0 {
execution.Started = execution.Finished
}
err = t.Executions.Update(noContext, execution)
if err == gitness_store.ErrVersionConflict {
log.Warn().Err(err).
Msg("manager: execution updated by another goroutine")
return nil
}
if err != nil {
log.Warn().Err(err).
Msg("manager: cannot update the execution")
return err
}
execution.Stages = stages
err = t.SSEStreamer.Publish(noContext, repo.ParentID, enum.SSETypeExecutionCompleted, execution)
if err != nil {
log.Warn().Err(err).
Msg("manager: could not publish execution completed event")
}
pipeline, err := t.Pipelines.Find(ctx, execution.PipelineID)
if err != nil {
log.Error().Err(err).Msg("manager: cannot find pipeline")
return err
}
// try to write to the checks store - if not, log an error and continue
err = checks.Write(ctx, t.Checks, execution, pipeline)
if err != nil {
log.Error().Err(err).Msg("manager: could not write to checks store")
}
return nil
}
// cancelDownstream is a helper function that tests for
// downstream stages and cancels them based on the overall
// pipeline state.
func (t *teardown) cancelDownstream(
ctx context.Context,
stages []*types.Stage,
) error {
failed := false
for _, s := range stages {
// check pipeline state
if s.Status.IsFailed() {
failed = true
}
}
var errs error
for _, s := range stages {
if s.Status != enum.CIStatusWaitingOnDeps {
continue
}
var skip bool
if failed && !s.OnFailure {
skip = true
}
if !failed && !s.OnSuccess {
skip = true
}
if !skip {
continue
}
if !areDepsComplete(s, stages) {
continue
}
log := log.With().
Int64("stage.id", s.ID).
Bool("stage.on_success", s.OnSuccess).
Bool("stage.on_failure", s.OnFailure).
Bool("failed", failed).
Str("stage.depends_on", strings.Join(s.DependsOn, ",")).
Logger()
log.Debug().Msg("manager: skipping step")
s.Status = enum.CIStatusSkipped
s.Started = time.Now().UnixMilli()
s.Stopped = time.Now().UnixMilli()
err := t.Stages.Update(noContext, s)
if err == gitness_store.ErrVersionConflict {
t.resync(ctx, s)
continue
}
if err != nil {
log.Error().Err(err).
Msg("manager: cannot update stage status")
errs = multierror.Append(errs, err)
}
}
return errs
}
func isexecutionComplete(stages []*types.Stage) bool {
for _, stage := range stages {
switch stage.Status {
case enum.CIStatusPending,
enum.CIStatusRunning,
enum.CIStatusWaitingOnDeps,
enum.CIStatusDeclined,
enum.CIStatusBlocked:
return false
}
}
return true
}
func areDepsComplete(stage *types.Stage, stages []*types.Stage) bool {
deps := map[string]struct{}{}
for _, dep := range stage.DependsOn {
deps[dep] = struct{}{}
}
for _, sibling := range stages {
if _, ok := deps[sibling.Name]; !ok {
continue
}
if !sibling.Status.IsDone() {
return false
}
}
return true
}
// scheduleDownstream is a helper function that tests for
// downstream stages and schedules stages if all dependencies
// and execution requirements are met.
func (t *teardown) scheduleDownstream(
ctx context.Context,
stage *types.Stage,
stages []*types.Stage,
) error {
var errs error
for _, sibling := range stages {
if sibling.Status == enum.CIStatusWaitingOnDeps {
if len(sibling.DependsOn) == 0 {
continue
}
// PROBLEM: isDep only checks the direct parent
// i think ....
// if isDep(stage, sibling) == false {
// continue
// }
if !areDepsComplete(sibling, stages) {
continue
}
// if isLastDep(stage, sibling, stages) == false {
// continue
// }
log := log.With().
Int64("stage.id", sibling.ID).
Str("stage.name", sibling.Name).
Str("stage.depends_on", strings.Join(sibling.DependsOn, ",")).
Logger()
log.Debug().Msg("manager: schedule next stage")
sibling.Status = enum.CIStatusPending
err := t.Stages.Update(noContext, sibling)
if err == gitness_store.ErrVersionConflict {
t.resync(ctx, sibling)
continue
}
if err != nil {
log.Error().Err(err).
Msg("manager: cannot update stage status")
errs = multierror.Append(errs, err)
}
err = t.Scheduler.Schedule(noContext, sibling)
if err != nil {
log.Error().Err(err).
Msg("manager: cannot schedule stage")
errs = multierror.Append(errs, err)
}
}
}
return errs
}
// resync updates the stage from the database. Note that it does
// not update the Version field. This is by design. It prevents
// the current go routine from updating a stage that has been
// updated by another go routine.
func (t *teardown) resync(ctx context.Context, stage *types.Stage) error {
updated, err := t.Stages.Find(ctx, stage.ID)
if err != nil {
return err
}
stage.Status = updated.Status
stage.Error = updated.Error
stage.ExitCode = updated.ExitCode
stage.Machine = updated.Machine
stage.Started = updated.Started
stage.Stopped = updated.Stopped
return nil
}