Merge branch 'mg/import/fix_repo_import_empty_and_large' of _OKE5H2PQKOUfzFFDuD4FA/default/CODE/gitness (#427)

This commit is contained in:
Marko Gacesa 2023-09-11 12:05:27 +00:00 committed by Harness
commit c71ebc99f1
4 changed files with 45 additions and 35 deletions

View File

@ -69,6 +69,7 @@ func (g Adapter) GetDefaultBranch(ctx context.Context, repoPath string) (string,
// If the repo doesn't have a default branch, types.ErrNoDefaultBranch is returned. // If the repo doesn't have a default branch, types.ErrNoDefaultBranch is returned.
func (g Adapter) GetRemoteDefaultBranch(ctx context.Context, remoteURL string) (string, error) { func (g Adapter) GetRemoteDefaultBranch(ctx context.Context, remoteURL string) (string, error) {
args := []string{ args := []string{
"-c", "credential.helper=",
"ls-remote", "ls-remote",
"--symref", "--symref",
"-q", "-q",
@ -118,6 +119,7 @@ func (g Adapter) Clone(ctx context.Context, from, to string, opts types.CloneRep
func (g Adapter) Sync(ctx context.Context, repoPath string, remoteURL string) error { func (g Adapter) Sync(ctx context.Context, repoPath string, remoteURL string) error {
args := []string{ args := []string{
"-c", "advice.fetchShowForcedUpdates=false", "-c", "advice.fetchShowForcedUpdates=false",
"-c", "credential.helper=",
"fetch", "fetch",
"--quiet", "--quiet",
"--prune", "--prune",
@ -131,7 +133,8 @@ func (g Adapter) Sync(ctx context.Context, repoPath string, remoteURL string) er
cmd := gitea.NewCommand(ctx, args...) cmd := gitea.NewCommand(ctx, args...)
_, _, err := cmd.RunStdString(&gitea.RunOpts{ _, _, err := cmd.RunStdString(&gitea.RunOpts{
Dir: repoPath, Dir: repoPath,
UseContextTimeout: true,
}) })
if err != nil { if err != nil {
return processGiteaErrorf(err, "failed to sync repo") return processGiteaErrorf(err, "failed to sync repo")

View File

@ -344,6 +344,11 @@ func (s RepositoryService) SyncRepository(
// get remote default branch // get remote default branch
defaultBranch, err := s.adapter.GetRemoteDefaultBranch(ctx, request.GetSource()) defaultBranch, err := s.adapter.GetRemoteDefaultBranch(ctx, request.GetSource())
if errors.Is(err, types.ErrNoDefaultBranch) {
return &rpc.SyncRepositoryResponse{
DefaultBranch: "",
}, nil
}
if err != nil { if err != nil {
return nil, processGitErrorf(err, "failed to get default branch from repo") return nil, processGitErrorf(err, "failed to get default branch from repo")
} }

View File

@ -49,11 +49,12 @@ type Input struct {
const jobType = "repository_import" const jobType = "repository_import"
func (i *Repository) Register(executor *job.Executor) error { func (r *Repository) Register(executor *job.Executor) error {
return executor.Register(jobType, i) return executor.Register(jobType, r)
} }
func (i *Repository) Run(ctx context.Context, provider Provider, repo *types.Repository, cloneURL string) error { // Run starts a background job that imports the provided repository from the provided clone URL.
func (r *Repository) Run(ctx context.Context, provider Provider, repo *types.Repository, cloneURL string) error {
input := Input{ input := Input{
RepoID: repo.ID, RepoID: repo.ID,
GitUser: provider.Username, GitUser: provider.Username,
@ -68,7 +69,7 @@ func (i *Repository) Run(ctx context.Context, provider Provider, repo *types.Rep
strData := strings.TrimSpace(string(data)) strData := strings.TrimSpace(string(data))
return i.scheduler.RunJob(ctx, job.Definition{ return r.scheduler.RunJob(ctx, job.Definition{
UID: *repo.ImportingJobUID, UID: *repo.ImportingJobUID,
Type: jobType, Type: jobType,
MaxRetries: importJobMaxRetries, MaxRetries: importJobMaxRetries,
@ -77,7 +78,8 @@ func (i *Repository) Run(ctx context.Context, provider Provider, repo *types.Rep
}) })
} }
func (i *Repository) RunMany(ctx context.Context, // RunMany starts background jobs that import the provided repositories from the provided clone URLs.
func (r *Repository) RunMany(ctx context.Context,
groupID string, groupID string,
provider Provider, provider Provider,
repos []*types.Repository, repos []*types.Repository,
@ -119,7 +121,7 @@ func (i *Repository) RunMany(ctx context.Context,
} }
} }
err := i.scheduler.RunJobs(ctx, groupID, defs) err := r.scheduler.RunJobs(ctx, groupID, defs)
if err != nil { if err != nil {
return fmt.Errorf("failed to run jobs: %w", err) return fmt.Errorf("failed to run jobs: %w", err)
} }
@ -128,7 +130,7 @@ func (i *Repository) RunMany(ctx context.Context,
} }
// Handle is repository import background job handler. // Handle is repository import background job handler.
func (i *Repository) Handle(ctx context.Context, data string, _ job.ProgressReporter) (string, error) { func (r *Repository) Handle(ctx context.Context, data string, _ job.ProgressReporter) (string, error) {
systemPrincipal := bootstrap.NewSystemServiceSession().Principal systemPrincipal := bootstrap.NewSystemServiceSession().Principal
var input Input var input Input
@ -150,7 +152,7 @@ func (i *Repository) Handle(ctx context.Context, data string, _ job.ProgressRepo
input.CloneURL = repoURL.String() input.CloneURL = repoURL.String()
} }
repo, err := i.repoStore.Find(ctx, input.RepoID) repo, err := r.repoStore.Find(ctx, input.RepoID)
if err != nil { if err != nil {
return "", fmt.Errorf("failed to find repo by id: %w", err) return "", fmt.Errorf("failed to find repo by id: %w", err)
} }
@ -159,7 +161,7 @@ func (i *Repository) Handle(ctx context.Context, data string, _ job.ProgressRepo
return "", fmt.Errorf("repository %s is not being imported", repo.UID) return "", fmt.Errorf("repository %s is not being imported", repo.UID)
} }
gitUID, err := i.createGitRepository(ctx, &systemPrincipal, repo.ID) gitUID, err := r.createGitRepository(ctx, &systemPrincipal, repo.ID)
if err != nil { if err != nil {
return "", fmt.Errorf("failed to create empty git repository: %w", err) return "", fmt.Errorf("failed to create empty git repository: %w", err)
} }
@ -167,12 +169,16 @@ func (i *Repository) Handle(ctx context.Context, data string, _ job.ProgressRepo
err = func() error { err = func() error {
repo.GitUID = gitUID repo.GitUID = gitUID
defaultBranch, err := i.syncGitRepository(ctx, &systemPrincipal, repo, input.CloneURL) defaultBranch, err := r.syncGitRepository(ctx, &systemPrincipal, repo, input.CloneURL)
if err != nil { if err != nil {
return fmt.Errorf("failed to sync git repository from '%s': %w", input.CloneURL, err) return fmt.Errorf("failed to sync git repository from '%s': %w", input.CloneURL, err)
} }
repo, err = i.repoStore.UpdateOptLock(ctx, repo, func(repo *types.Repository) error { if defaultBranch == "" {
defaultBranch = r.defaultBranch
}
repo, err = r.repoStore.UpdateOptLock(ctx, repo, func(repo *types.Repository) error {
if !repo.Importing { if !repo.Importing {
return errors.New("repository has already finished importing") return errors.New("repository has already finished importing")
} }
@ -190,7 +196,7 @@ func (i *Repository) Handle(ctx context.Context, data string, _ job.ProgressRepo
return nil return nil
}() }()
if err != nil { if err != nil {
if errDel := i.deleteGitRepository(ctx, &systemPrincipal, repo); errDel != nil { if errDel := r.deleteGitRepository(ctx, &systemPrincipal, repo); errDel != nil {
log.Ctx(ctx).Err(err). log.Ctx(ctx).Err(err).
Str("gitUID", gitUID). Str("gitUID", gitUID).
Msg("failed to delete git repository after failed import") Msg("failed to delete git repository after failed import")
@ -202,13 +208,13 @@ func (i *Repository) Handle(ctx context.Context, data string, _ job.ProgressRepo
return "", nil return "", nil
} }
func (i *Repository) GetProgress(ctx context.Context, repo *types.Repository) (types.JobProgress, error) { func (r *Repository) GetProgress(ctx context.Context, repo *types.Repository) (types.JobProgress, error) {
if !repo.Importing || repo.ImportingJobUID == nil || *repo.ImportingJobUID == "" { if !repo.Importing || repo.ImportingJobUID == nil || *repo.ImportingJobUID == "" {
// if the repo is not being imported, or it's job ID has been cleared (or never existed) return state=finished // if the repo is not being imported, or it's job ID has been cleared (or never existed) return state=finished
return job.DoneProgress(), nil return job.DoneProgress(), nil
} }
progress, err := i.scheduler.GetJobProgress(ctx, *repo.ImportingJobUID) progress, err := r.scheduler.GetJobProgress(ctx, *repo.ImportingJobUID)
if errors.Is(err, gitness_store.ErrResourceNotFound) { if errors.Is(err, gitness_store.ErrResourceNotFound) {
// if the job is not found return state=failed // if the job is not found return state=failed
return job.FailProgress(), nil return job.FailProgress(), nil
@ -220,12 +226,12 @@ func (i *Repository) GetProgress(ctx context.Context, repo *types.Repository) (t
return progress, nil return progress, nil
} }
func (i *Repository) Cancel(ctx context.Context, repo *types.Repository) error { func (r *Repository) Cancel(ctx context.Context, repo *types.Repository) error {
if repo.ImportingJobUID == nil || *repo.ImportingJobUID == "" { if repo.ImportingJobUID == nil || *repo.ImportingJobUID == "" {
return nil return nil
} }
err := i.scheduler.CancelJob(ctx, *repo.ImportingJobUID) err := r.scheduler.CancelJob(ctx, *repo.ImportingJobUID)
if err != nil { if err != nil {
return fmt.Errorf("failed to cancel job: %w", err) return fmt.Errorf("failed to cancel job: %w", err)
} }
@ -233,24 +239,24 @@ func (i *Repository) Cancel(ctx context.Context, repo *types.Repository) error {
return nil return nil
} }
func (i *Repository) createGitRepository(ctx context.Context, func (r *Repository) createGitRepository(ctx context.Context,
principal *types.Principal, principal *types.Principal,
repoID int64, repoID int64,
) (string, error) { ) (string, error) {
now := time.Now() now := time.Now()
envVars, err := i.createEnvVars(ctx, principal, repoID) envVars, err := r.createEnvVars(ctx, principal, repoID)
if err != nil { if err != nil {
return "", err return "", err
} }
resp, err := i.git.CreateRepository(ctx, &gitrpc.CreateRepositoryParams{ resp, err := r.git.CreateRepository(ctx, &gitrpc.CreateRepositoryParams{
Actor: gitrpc.Identity{ Actor: gitrpc.Identity{
Name: principal.DisplayName, Name: principal.DisplayName,
Email: principal.Email, Email: principal.Email,
}, },
EnvVars: envVars, EnvVars: envVars,
DefaultBranch: i.defaultBranch, DefaultBranch: r.defaultBranch,
Files: nil, Files: nil,
Author: &gitrpc.Identity{ Author: &gitrpc.Identity{
Name: principal.DisplayName, Name: principal.DisplayName,
@ -270,17 +276,17 @@ func (i *Repository) createGitRepository(ctx context.Context,
return resp.UID, nil return resp.UID, nil
} }
func (i *Repository) syncGitRepository(ctx context.Context, func (r *Repository) syncGitRepository(ctx context.Context,
principal *types.Principal, principal *types.Principal,
repo *types.Repository, repo *types.Repository,
sourceCloneURL string, sourceCloneURL string,
) (string, error) { ) (string, error) {
writeParams, err := i.createRPCWriteParams(ctx, principal, repo) writeParams, err := r.createRPCWriteParams(ctx, principal, repo)
if err != nil { if err != nil {
return "", err return "", err
} }
syncOut, err := i.git.SyncRepository(ctx, &gitrpc.SyncRepositoryParams{ syncOut, err := r.git.SyncRepository(ctx, &gitrpc.SyncRepositoryParams{
WriteParams: writeParams, WriteParams: writeParams,
Source: sourceCloneURL, Source: sourceCloneURL,
CreateIfNotExists: false, CreateIfNotExists: false,
@ -292,16 +298,16 @@ func (i *Repository) syncGitRepository(ctx context.Context,
return syncOut.DefaultBranch, nil return syncOut.DefaultBranch, nil
} }
func (i *Repository) deleteGitRepository(ctx context.Context, func (r *Repository) deleteGitRepository(ctx context.Context,
principal *types.Principal, principal *types.Principal,
repo *types.Repository, repo *types.Repository,
) error { ) error {
writeParams, err := i.createRPCWriteParams(ctx, principal, repo) writeParams, err := r.createRPCWriteParams(ctx, principal, repo)
if err != nil { if err != nil {
return err return err
} }
err = i.git.DeleteRepository(ctx, &gitrpc.DeleteRepositoryParams{ err = r.git.DeleteRepository(ctx, &gitrpc.DeleteRepositoryParams{
WriteParams: writeParams, WriteParams: writeParams,
}) })
if err != nil { if err != nil {
@ -311,11 +317,11 @@ func (i *Repository) deleteGitRepository(ctx context.Context,
return nil return nil
} }
func (i *Repository) createRPCWriteParams(ctx context.Context, func (r *Repository) createRPCWriteParams(ctx context.Context,
principal *types.Principal, principal *types.Principal,
repo *types.Repository, repo *types.Repository,
) (gitrpc.WriteParams, error) { ) (gitrpc.WriteParams, error) {
envVars, err := i.createEnvVars(ctx, principal, repo.ID) envVars, err := r.createEnvVars(ctx, principal, repo.ID)
if err != nil { if err != nil {
return gitrpc.WriteParams{}, err return gitrpc.WriteParams{}, err
} }
@ -330,13 +336,13 @@ func (i *Repository) createRPCWriteParams(ctx context.Context,
}, nil }, nil
} }
func (i *Repository) createEnvVars(ctx context.Context, func (r *Repository) createEnvVars(ctx context.Context,
principal *types.Principal, principal *types.Principal,
repoID int64, repoID int64,
) (map[string]string, error) { ) (map[string]string, error) {
envVars, err := githook.GenerateEnvironmentVariables( envVars, err := githook.GenerateEnvironmentVariables(
ctx, ctx,
i.urlProvider.GetAPIBaseURLInternal(), r.urlProvider.GetAPIBaseURLInternal(),
repoID, repoID,
principal.ID, principal.ID,
false, false,

View File

@ -35,9 +35,6 @@ type Scheduler struct {
maxRunning int maxRunning int
purgeMinOldAge time.Duration purgeMinOldAge time.Duration
// global context
globalCtx context.Context
// synchronization stuff // synchronization stuff
signal chan time.Time signal chan time.Time
done chan struct{} done chan struct{}
@ -104,7 +101,6 @@ func (s *Scheduler) Run(ctx context.Context) error {
defer close(s.done) defer close(s.done)
s.signal = make(chan time.Time, 1) s.signal = make(chan time.Time, 1)
s.globalCtx = ctx
timer := newSchedulerTimer() timer := newSchedulerTimer()
defer timer.Stop() defer timer.Stop()