diff --git a/internal/services/exporter/harness_code_client.go b/internal/services/exporter/harness_code_client.go index 45dd89dc2..02d0680c4 100644 --- a/internal/services/exporter/harness_code_client.go +++ b/internal/services/exporter/harness_code_client.go @@ -30,6 +30,7 @@ var ( ErrNotFound = fmt.Errorf("not found") ErrBadRequest = fmt.Errorf("bad request") ErrInternal = fmt.Errorf("internal error") + ErrDuplicate = fmt.Errorf("resource already exists") ) type HarnessCodeClient struct { @@ -203,6 +204,8 @@ func mapStatusCodeToError(statusCode int) error { return ErrNotFound case statusCode == 400: return ErrBadRequest + case statusCode == 409: + return ErrDuplicate case statusCode >= 400: return fmt.Errorf("received client side error status code %d", statusCode) case statusCode >= 300: diff --git a/internal/services/exporter/repository.go b/internal/services/exporter/repository.go index bd03e3974..394cce468 100644 --- a/internal/services/exporter/repository.go +++ b/internal/services/exporter/repository.go @@ -8,12 +8,14 @@ import ( "context" "encoding/base64" "encoding/json" + "errors" "fmt" "github.com/harness/gitness/encrypt" "github.com/harness/gitness/internal/api/controller/repo" "github.com/harness/gitness/internal/sse" "github.com/harness/gitness/types/enum" "github.com/rs/zerolog/log" + "golang.org/x/exp/slices" "net/url" "strings" "time" @@ -56,9 +58,10 @@ const ( exportJobMaxDuration = 45 * time.Minute exportRepoJobUid = "export_repo_%d" exportSpaceJobUid = "export_space_%d" + jobType = "repository_export" ) -const jobType = "repository_export" +var ErrJobRunning = errors.New("an export job is already running") func (r *Repository) Register(executor *job.Executor) error { return executor.Register(jobType, r) @@ -66,6 +69,23 @@ func (r *Repository) Register(executor *job.Executor) error { func (r *Repository) RunMany(ctx context.Context, spaceId int64, harnessCodeInfo *HarnessCodeInfo, repos []*types.Repository) error { jobGroupId := getJobGroupId(spaceId) + + jobs, err := r.scheduler.GetJobProgressForGroup(ctx, jobGroupId) + if err != nil { + return fmt.Errorf("cannot get job progress before starting. %w", err) + } + + err = checkJobAlreadyRunning(jobs) + if err != nil { + return err + } + + n, err := r.scheduler.PurgeJobsByGroupId(ctx, jobGroupId) + if err != nil { + return err + } + log.Ctx(ctx).Info().Msgf("deleted %d old jobs", n) + jobDefinitions := make([]job.Definition, len(repos)) for i, repository := range repos { repoJobData := Input{ @@ -100,6 +120,17 @@ func (r *Repository) RunMany(ctx context.Context, spaceId int64, harnessCodeInfo return r.scheduler.RunJobs(ctx, jobGroupId, jobDefinitions) } +func checkJobAlreadyRunning(jobs []types.JobProgress) error { + if jobs != nil { + for _, j := range jobs { + if !slices.Contains(enum.GetCompletedJobState(), j.State) { + return ErrJobRunning + } + } + } + return nil +} + func getJobGroupId(spaceId int64) string { return fmt.Sprintf(exportSpaceJobUid, spaceId) } @@ -156,7 +187,6 @@ func (r *Repository) Handle(ctx context.Context, data string, _ job.ProgressRepo } log.Info().Msgf("completed repository export for repo", repository.UID) - publishSSE(ctx, r, repository) return "", nil diff --git a/internal/services/job/scheduler.go b/internal/services/job/scheduler.go index 26e901c57..d318e2454 100644 --- a/internal/services/job/scheduler.go +++ b/internal/services/job/scheduler.go @@ -624,6 +624,14 @@ func (s *Scheduler) GetJobProgressForGroup(ctx context.Context, jobGroupUID stri return mapToProgressMany(job), nil } +func (s *Scheduler) PurgeJobsByGroupId(ctx context.Context, jobGroupUID string) (int64, error) { + n, err := s.store.DeleteByGroupID(ctx, jobGroupUID) + if err != nil { + return 0, err + } + return n, nil +} + func mapToProgressMany(jobs []*types.Job) []types.JobProgress { if jobs == nil { return nil diff --git a/internal/store/database.go b/internal/store/database.go index 691f21797..bc3575d56 100644 --- a/internal/store/database.go +++ b/internal/store/database.go @@ -448,6 +448,9 @@ type ( // ListByGroupID fetches all jobs for a group id ListByGroupID(ctx context.Context, groupId string) ([]*types.Job, error) + // DeleteByGroupID deletes all jobs for a group id + DeleteByGroupID(ctx context.Context, groupId string) (int64, error) + // Create is used to create a new job. Create(ctx context.Context, job *types.Job) error diff --git a/internal/store/database/job.go b/internal/store/database/job.go index 8ea265660..2b59d0d75 100644 --- a/internal/store/database/job.go +++ b/internal/store/database/job.go @@ -91,6 +91,32 @@ func (s *JobStore) ListByGroupID(ctx context.Context, groupId string) ([]*types. return result, nil } +// DeleteByGroupID deletes all jobs for a group id +func (s *JobStore) DeleteByGroupID(ctx context.Context, groupId string) (int64, error) { + stmt := database.Builder. + Delete("jobs"). + Where("(job_group_id = ?)", groupId) + + sql, args, err := stmt.ToSql() + if err != nil { + return 0, fmt.Errorf("failed to convert delete by group id jobs query to sql: %w", err) + } + + db := dbtx.GetAccessor(ctx, s.db) + + result, err := db.ExecContext(ctx, sql, args...) + if err != nil { + return 0, database.ProcessSQLErrorf(err, "failed to execute delete jobs by group id query") + } + + n, err := result.RowsAffected() + if err != nil { + return 0, database.ProcessSQLErrorf(err, "failed to get number of deleted jobs in group") + } + + return n, nil +} + // Create creates a new job. func (s *JobStore) Create(ctx context.Context, job *types.Job) error { const sqlQuery = ` diff --git a/types/enum/job.go b/types/enum/job.go index 0bb05da3b..25fb09dad 100644 --- a/types/enum/job.go +++ b/types/enum/job.go @@ -24,3 +24,7 @@ const ( JobPriorityNormal JobPriority = 0 JobPriorityElevated JobPriority = 1 ) + +func GetCompletedJobState() []JobState { + return []JobState{JobStateFinished, JobStateCanceled, JobStateFinished} +}