From 1fd4de9ae74d0dfb67c97790623b145d0c282f04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Ga=C4=87e=C5=A1a?= Date: Mon, 11 Sep 2023 14:04:28 +0200 Subject: [PATCH] fix importing of empty and large repositories --- gitrpc/internal/gitea/repo.go | 5 +- gitrpc/internal/service/repo.go | 5 ++ internal/services/importer/repository.go | 66 +++++++++++++----------- internal/services/job/scheduler.go | 4 -- 4 files changed, 45 insertions(+), 35 deletions(-) diff --git a/gitrpc/internal/gitea/repo.go b/gitrpc/internal/gitea/repo.go index c119dcb7b..15c791211 100644 --- a/gitrpc/internal/gitea/repo.go +++ b/gitrpc/internal/gitea/repo.go @@ -69,6 +69,7 @@ func (g Adapter) GetDefaultBranch(ctx context.Context, repoPath string) (string, // If the repo doesn't have a default branch, types.ErrNoDefaultBranch is returned. func (g Adapter) GetRemoteDefaultBranch(ctx context.Context, remoteURL string) (string, error) { args := []string{ + "-c", "credential.helper=", "ls-remote", "--symref", "-q", @@ -118,6 +119,7 @@ func (g Adapter) Clone(ctx context.Context, from, to string, opts types.CloneRep func (g Adapter) Sync(ctx context.Context, repoPath string, remoteURL string) error { args := []string{ "-c", "advice.fetchShowForcedUpdates=false", + "-c", "credential.helper=", "fetch", "--quiet", "--prune", @@ -131,7 +133,8 @@ func (g Adapter) Sync(ctx context.Context, repoPath string, remoteURL string) er cmd := gitea.NewCommand(ctx, args...) _, _, err := cmd.RunStdString(&gitea.RunOpts{ - Dir: repoPath, + Dir: repoPath, + UseContextTimeout: true, }) if err != nil { return processGiteaErrorf(err, "failed to sync repo") diff --git a/gitrpc/internal/service/repo.go b/gitrpc/internal/service/repo.go index b84daf4e0..928ae2f9f 100644 --- a/gitrpc/internal/service/repo.go +++ b/gitrpc/internal/service/repo.go @@ -344,6 +344,11 @@ func (s RepositoryService) SyncRepository( // get remote default branch defaultBranch, err := s.adapter.GetRemoteDefaultBranch(ctx, request.GetSource()) + if errors.Is(err, types.ErrNoDefaultBranch) { + return &rpc.SyncRepositoryResponse{ + DefaultBranch: "", + }, nil + } if err != nil { return nil, processGitErrorf(err, "failed to get default branch from repo") } diff --git a/internal/services/importer/repository.go b/internal/services/importer/repository.go index 41a4982ee..a6e9add4a 100644 --- a/internal/services/importer/repository.go +++ b/internal/services/importer/repository.go @@ -49,11 +49,12 @@ type Input struct { const jobType = "repository_import" -func (i *Repository) Register(executor *job.Executor) error { - return executor.Register(jobType, i) +func (r *Repository) Register(executor *job.Executor) error { + return executor.Register(jobType, r) } -func (i *Repository) Run(ctx context.Context, provider Provider, repo *types.Repository, cloneURL string) error { +// Run starts a background job that imports the provided repository from the provided clone URL. +func (r *Repository) Run(ctx context.Context, provider Provider, repo *types.Repository, cloneURL string) error { input := Input{ RepoID: repo.ID, GitUser: provider.Username, @@ -68,7 +69,7 @@ func (i *Repository) Run(ctx context.Context, provider Provider, repo *types.Rep strData := strings.TrimSpace(string(data)) - return i.scheduler.RunJob(ctx, job.Definition{ + return r.scheduler.RunJob(ctx, job.Definition{ UID: *repo.ImportingJobUID, Type: jobType, MaxRetries: importJobMaxRetries, @@ -77,7 +78,8 @@ func (i *Repository) Run(ctx context.Context, provider Provider, repo *types.Rep }) } -func (i *Repository) RunMany(ctx context.Context, +// RunMany starts background jobs that import the provided repositories from the provided clone URLs. +func (r *Repository) RunMany(ctx context.Context, groupID string, provider Provider, repos []*types.Repository, @@ -119,7 +121,7 @@ func (i *Repository) RunMany(ctx context.Context, } } - err := i.scheduler.RunJobs(ctx, groupID, defs) + err := r.scheduler.RunJobs(ctx, groupID, defs) if err != nil { return fmt.Errorf("failed to run jobs: %w", err) } @@ -128,7 +130,7 @@ func (i *Repository) RunMany(ctx context.Context, } // Handle is repository import background job handler. -func (i *Repository) Handle(ctx context.Context, data string, _ job.ProgressReporter) (string, error) { +func (r *Repository) Handle(ctx context.Context, data string, _ job.ProgressReporter) (string, error) { systemPrincipal := bootstrap.NewSystemServiceSession().Principal var input Input @@ -150,7 +152,7 @@ func (i *Repository) Handle(ctx context.Context, data string, _ job.ProgressRepo input.CloneURL = repoURL.String() } - repo, err := i.repoStore.Find(ctx, input.RepoID) + repo, err := r.repoStore.Find(ctx, input.RepoID) if err != nil { return "", fmt.Errorf("failed to find repo by id: %w", err) } @@ -159,7 +161,7 @@ func (i *Repository) Handle(ctx context.Context, data string, _ job.ProgressRepo return "", fmt.Errorf("repository %s is not being imported", repo.UID) } - gitUID, err := i.createGitRepository(ctx, &systemPrincipal, repo.ID) + gitUID, err := r.createGitRepository(ctx, &systemPrincipal, repo.ID) if err != nil { return "", fmt.Errorf("failed to create empty git repository: %w", err) } @@ -167,12 +169,16 @@ func (i *Repository) Handle(ctx context.Context, data string, _ job.ProgressRepo err = func() error { repo.GitUID = gitUID - defaultBranch, err := i.syncGitRepository(ctx, &systemPrincipal, repo, input.CloneURL) + defaultBranch, err := r.syncGitRepository(ctx, &systemPrincipal, repo, input.CloneURL) if err != nil { return fmt.Errorf("failed to sync git repository from '%s': %w", input.CloneURL, err) } - repo, err = i.repoStore.UpdateOptLock(ctx, repo, func(repo *types.Repository) error { + if defaultBranch == "" { + defaultBranch = r.defaultBranch + } + + repo, err = r.repoStore.UpdateOptLock(ctx, repo, func(repo *types.Repository) error { if !repo.Importing { return errors.New("repository has already finished importing") } @@ -190,7 +196,7 @@ func (i *Repository) Handle(ctx context.Context, data string, _ job.ProgressRepo return nil }() if err != nil { - if errDel := i.deleteGitRepository(ctx, &systemPrincipal, repo); errDel != nil { + if errDel := r.deleteGitRepository(ctx, &systemPrincipal, repo); errDel != nil { log.Ctx(ctx).Err(err). Str("gitUID", gitUID). Msg("failed to delete git repository after failed import") @@ -202,13 +208,13 @@ func (i *Repository) Handle(ctx context.Context, data string, _ job.ProgressRepo return "", nil } -func (i *Repository) GetProgress(ctx context.Context, repo *types.Repository) (types.JobProgress, error) { +func (r *Repository) GetProgress(ctx context.Context, repo *types.Repository) (types.JobProgress, error) { if !repo.Importing || repo.ImportingJobUID == nil || *repo.ImportingJobUID == "" { // if the repo is not being imported, or it's job ID has been cleared (or never existed) return state=finished return job.DoneProgress(), nil } - progress, err := i.scheduler.GetJobProgress(ctx, *repo.ImportingJobUID) + progress, err := r.scheduler.GetJobProgress(ctx, *repo.ImportingJobUID) if errors.Is(err, gitness_store.ErrResourceNotFound) { // if the job is not found return state=failed return job.FailProgress(), nil @@ -220,12 +226,12 @@ func (i *Repository) GetProgress(ctx context.Context, repo *types.Repository) (t return progress, nil } -func (i *Repository) Cancel(ctx context.Context, repo *types.Repository) error { +func (r *Repository) Cancel(ctx context.Context, repo *types.Repository) error { if repo.ImportingJobUID == nil || *repo.ImportingJobUID == "" { return nil } - err := i.scheduler.CancelJob(ctx, *repo.ImportingJobUID) + err := r.scheduler.CancelJob(ctx, *repo.ImportingJobUID) if err != nil { return fmt.Errorf("failed to cancel job: %w", err) } @@ -233,24 +239,24 @@ func (i *Repository) Cancel(ctx context.Context, repo *types.Repository) error { return nil } -func (i *Repository) createGitRepository(ctx context.Context, +func (r *Repository) createGitRepository(ctx context.Context, principal *types.Principal, repoID int64, ) (string, error) { now := time.Now() - envVars, err := i.createEnvVars(ctx, principal, repoID) + envVars, err := r.createEnvVars(ctx, principal, repoID) if err != nil { return "", err } - resp, err := i.git.CreateRepository(ctx, &gitrpc.CreateRepositoryParams{ + resp, err := r.git.CreateRepository(ctx, &gitrpc.CreateRepositoryParams{ Actor: gitrpc.Identity{ Name: principal.DisplayName, Email: principal.Email, }, EnvVars: envVars, - DefaultBranch: i.defaultBranch, + DefaultBranch: r.defaultBranch, Files: nil, Author: &gitrpc.Identity{ Name: principal.DisplayName, @@ -270,17 +276,17 @@ func (i *Repository) createGitRepository(ctx context.Context, return resp.UID, nil } -func (i *Repository) syncGitRepository(ctx context.Context, +func (r *Repository) syncGitRepository(ctx context.Context, principal *types.Principal, repo *types.Repository, sourceCloneURL string, ) (string, error) { - writeParams, err := i.createRPCWriteParams(ctx, principal, repo) + writeParams, err := r.createRPCWriteParams(ctx, principal, repo) if err != nil { return "", err } - syncOut, err := i.git.SyncRepository(ctx, &gitrpc.SyncRepositoryParams{ + syncOut, err := r.git.SyncRepository(ctx, &gitrpc.SyncRepositoryParams{ WriteParams: writeParams, Source: sourceCloneURL, CreateIfNotExists: false, @@ -292,16 +298,16 @@ func (i *Repository) syncGitRepository(ctx context.Context, return syncOut.DefaultBranch, nil } -func (i *Repository) deleteGitRepository(ctx context.Context, +func (r *Repository) deleteGitRepository(ctx context.Context, principal *types.Principal, repo *types.Repository, ) error { - writeParams, err := i.createRPCWriteParams(ctx, principal, repo) + writeParams, err := r.createRPCWriteParams(ctx, principal, repo) if err != nil { return err } - err = i.git.DeleteRepository(ctx, &gitrpc.DeleteRepositoryParams{ + err = r.git.DeleteRepository(ctx, &gitrpc.DeleteRepositoryParams{ WriteParams: writeParams, }) if err != nil { @@ -311,11 +317,11 @@ func (i *Repository) deleteGitRepository(ctx context.Context, return nil } -func (i *Repository) createRPCWriteParams(ctx context.Context, +func (r *Repository) createRPCWriteParams(ctx context.Context, principal *types.Principal, repo *types.Repository, ) (gitrpc.WriteParams, error) { - envVars, err := i.createEnvVars(ctx, principal, repo.ID) + envVars, err := r.createEnvVars(ctx, principal, repo.ID) if err != nil { return gitrpc.WriteParams{}, err } @@ -330,13 +336,13 @@ func (i *Repository) createRPCWriteParams(ctx context.Context, }, nil } -func (i *Repository) createEnvVars(ctx context.Context, +func (r *Repository) createEnvVars(ctx context.Context, principal *types.Principal, repoID int64, ) (map[string]string, error) { envVars, err := githook.GenerateEnvironmentVariables( ctx, - i.urlProvider.GetAPIBaseURLInternal(), + r.urlProvider.GetAPIBaseURLInternal(), repoID, principal.ID, false, diff --git a/internal/services/job/scheduler.go b/internal/services/job/scheduler.go index f769e65f7..379327985 100644 --- a/internal/services/job/scheduler.go +++ b/internal/services/job/scheduler.go @@ -35,9 +35,6 @@ type Scheduler struct { maxRunning int purgeMinOldAge time.Duration - // global context - globalCtx context.Context - // synchronization stuff signal chan time.Time done chan struct{} @@ -104,7 +101,6 @@ func (s *Scheduler) Run(ctx context.Context) error { defer close(s.done) s.signal = make(chan time.Time, 1) - s.globalCtx = ctx timer := newSchedulerTimer() defer timer.Stop()