diff --git a/cli/server/harness.wire.go b/cli/server/harness.wire.go index a7432a4b6..47c285c81 100644 --- a/cli/server/harness.wire.go +++ b/cli/server/harness.wire.go @@ -13,6 +13,7 @@ import ( "github.com/harness/gitness/events" "github.com/harness/gitness/gitrpc" gitrpcserver "github.com/harness/gitness/gitrpc/server" + gitrpccron "github.com/harness/gitness/gitrpc/server/cron" "github.com/harness/gitness/harness/auth/authn" "github.com/harness/gitness/harness/auth/authz" "github.com/harness/gitness/harness/bootstrap" @@ -29,7 +30,6 @@ import ( "github.com/harness/gitness/internal/api/controller/space" "github.com/harness/gitness/internal/api/controller/user" controllerwebhook "github.com/harness/gitness/internal/api/controller/webhook" - "github.com/harness/gitness/internal/cron" gitevents "github.com/harness/gitness/internal/events/git" pullreqevents "github.com/harness/gitness/internal/events/pullreq" "github.com/harness/gitness/internal/server" @@ -58,7 +58,6 @@ func initSystem(ctx context.Context, config *gitnesstypes.Config) (*system, erro services.WireSet, cache.WireSet, server.WireSet, - cron.WireSet, url.WireSet, space.WireSet, repo.WireSet, @@ -88,6 +87,7 @@ func initSystem(ctx context.Context, config *gitnesstypes.Config) (*system, erro lock.WireSet, pubsub.WireSet, codecomments.WireSet, + gitrpccron.WireSet, ) return &system{}, nil } diff --git a/cli/server/harness.wire_gen.go b/cli/server/harness.wire_gen.go index 97d21df00..10cbc3a86 100644 --- a/cli/server/harness.wire_gen.go +++ b/cli/server/harness.wire_gen.go @@ -7,10 +7,10 @@ package server import ( "context" - "github.com/harness/gitness/events" "github.com/harness/gitness/gitrpc" server2 "github.com/harness/gitness/gitrpc/server" + "github.com/harness/gitness/gitrpc/server/cron" "github.com/harness/gitness/harness/auth/authn" "github.com/harness/gitness/harness/auth/authz" "github.com/harness/gitness/harness/bootstrap" @@ -27,7 +27,6 @@ import ( "github.com/harness/gitness/internal/api/controller/space" "github.com/harness/gitness/internal/api/controller/user" webhook2 "github.com/harness/gitness/internal/api/controller/webhook" - "github.com/harness/gitness/internal/cron" events3 "github.com/harness/gitness/internal/events/git" events2 "github.com/harness/gitness/internal/events/pullreq" router2 "github.com/harness/gitness/internal/router" @@ -177,7 +176,7 @@ func initSystem(ctx context.Context, config *types.Config) (*system, error) { if err != nil { return nil, err } - nightly := cron.NewNightly() + cronManager := cron.ProvideCronManager(serverConfig) repoGitInfoView := database.ProvideRepoGitInfoView(db) repoGitInfoCache := cache.ProvideRepoGitInfoCache(repoGitInfoView) pubsubConfig := pubsub.ProvideConfig(config) @@ -187,6 +186,6 @@ func initSystem(ctx context.Context, config *types.Config) (*system, error) { return nil, err } servicesServices := services.ProvideServices(webhookService, pullreqService) - serverSystem := newSystem(bootstrapBootstrap, serverServer, server3, nightly, servicesServices) + serverSystem := newSystem(bootstrapBootstrap, serverServer, server3, cronManager, servicesServices) return serverSystem, nil } diff --git a/cli/server/server.go b/cli/server/server.go index e545b919e..2622e9869 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -83,17 +83,16 @@ func (c *command) run(*kingpin.ParseContext) error { Stringer("version", version.Version). Msg("server started") - // start the purge routine. - g.Go(func() error { - system.nightly.Run(gCtx) - return nil - }) - log.Info().Msg("nightly subroutine started") - // start grpc server g.Go(system.gitRPCServer.Start) log.Info().Msg("gitrpc server started") + // run the gitrpc cron jobs + g.Go(func() error { + return system.gitRPCCronMngr.Run(ctx) + }) + log.Info().Msg("gitrpc cron manager subroutine started") + // wait until the error group context is done <-gCtx.Done() diff --git a/cli/server/standalone.wire.go b/cli/server/standalone.wire.go index a964f68e1..d289a2705 100644 --- a/cli/server/standalone.wire.go +++ b/cli/server/standalone.wire.go @@ -13,6 +13,7 @@ import ( "github.com/harness/gitness/events" "github.com/harness/gitness/gitrpc" gitrpcserver "github.com/harness/gitness/gitrpc/server" + gitrpccron "github.com/harness/gitness/gitrpc/server/cron" "github.com/harness/gitness/internal/api/controller/githook" "github.com/harness/gitness/internal/api/controller/principal" "github.com/harness/gitness/internal/api/controller/pullreq" @@ -25,7 +26,6 @@ import ( "github.com/harness/gitness/internal/auth/authn" "github.com/harness/gitness/internal/auth/authz" "github.com/harness/gitness/internal/bootstrap" - "github.com/harness/gitness/internal/cron" gitevents "github.com/harness/gitness/internal/events/git" pullreqevents "github.com/harness/gitness/internal/events/pullreq" "github.com/harness/gitness/internal/router" @@ -57,7 +57,6 @@ func initSystem(ctx context.Context, config *types.Config) (*system, error) { pullreqservice.WireSet, services.WireSet, server.WireSet, - cron.WireSet, url.WireSet, space.WireSet, repo.WireSet, @@ -85,6 +84,7 @@ func initSystem(ctx context.Context, config *types.Config) (*system, error) { lock.WireSet, pubsub.WireSet, codecomments.WireSet, + gitrpccron.WireSet, ) return &system{}, nil } diff --git a/cli/server/standalone.wire_gen.go b/cli/server/standalone.wire_gen.go index e633a9b92..1d1a48480 100644 --- a/cli/server/standalone.wire_gen.go +++ b/cli/server/standalone.wire_gen.go @@ -7,10 +7,10 @@ package server import ( "context" - "github.com/harness/gitness/events" "github.com/harness/gitness/gitrpc" server2 "github.com/harness/gitness/gitrpc/server" + "github.com/harness/gitness/gitrpc/server/cron" "github.com/harness/gitness/internal/api/controller/githook" "github.com/harness/gitness/internal/api/controller/principal" "github.com/harness/gitness/internal/api/controller/pullreq" @@ -23,7 +23,6 @@ import ( "github.com/harness/gitness/internal/auth/authn" "github.com/harness/gitness/internal/auth/authz" "github.com/harness/gitness/internal/bootstrap" - "github.com/harness/gitness/internal/cron" events3 "github.com/harness/gitness/internal/events/git" events2 "github.com/harness/gitness/internal/events/pullreq" "github.com/harness/gitness/internal/router" @@ -144,7 +143,7 @@ func initSystem(ctx context.Context, config *types.Config) (*system, error) { if err != nil { return nil, err } - nightly := cron.NewNightly() + cronManager := cron.ProvideCronManager(serverConfig) repoGitInfoView := database.ProvideRepoGitInfoView(db) repoGitInfoCache := cache.ProvideRepoGitInfoCache(repoGitInfoView) pubsubConfig := pubsub.ProvideConfig(config) @@ -154,6 +153,6 @@ func initSystem(ctx context.Context, config *types.Config) (*system, error) { return nil, err } servicesServices := services.ProvideServices(webhookService, pullreqService) - serverSystem := newSystem(bootstrapBootstrap, serverServer, server3, nightly, servicesServices) + serverSystem := newSystem(bootstrapBootstrap, serverServer, server3, cronManager, servicesServices) return serverSystem, nil } diff --git a/cli/server/system.go b/cli/server/system.go index 971aef5e5..2cfb41bf4 100644 --- a/cli/server/system.go +++ b/cli/server/system.go @@ -6,29 +6,29 @@ package server import ( gitrpcserver "github.com/harness/gitness/gitrpc/server" + gitrpccron "github.com/harness/gitness/gitrpc/server/cron" "github.com/harness/gitness/internal/bootstrap" - "github.com/harness/gitness/internal/cron" "github.com/harness/gitness/internal/server" "github.com/harness/gitness/internal/services" ) // system stores high level system sub-routines. type system struct { - bootstrap bootstrap.Bootstrap - server *server.Server - gitRPCServer *gitrpcserver.Server - nightly *cron.Nightly - services services.Services + bootstrap bootstrap.Bootstrap + server *server.Server + gitRPCServer *gitrpcserver.Server + services services.Services + gitRPCCronMngr *gitrpccron.CronManager } // newSystem returns a new system structure. func newSystem(bootstrap bootstrap.Bootstrap, server *server.Server, gitRPCServer *gitrpcserver.Server, - nightly *cron.Nightly, services services.Services) *system { + gitrpccron *gitrpccron.CronManager, services services.Services) *system { return &system{ - bootstrap: bootstrap, - server: server, - gitRPCServer: gitRPCServer, - nightly: nightly, - services: services, + bootstrap: bootstrap, + server: server, + gitRPCServer: gitRPCServer, + services: services, + gitRPCCronMngr: gitrpccron, } } diff --git a/gitrpc/server/cron/clean_slate_data.go b/gitrpc/server/cron/clean_slate_data.go new file mode 100644 index 000000000..c1dd787bd --- /dev/null +++ b/gitrpc/server/cron/clean_slate_data.go @@ -0,0 +1,43 @@ +package cron + +import ( + "context" + "os" + "path" + "path/filepath" + + "github.com/harness/gitness/gitrpc/server" + "github.com/rs/zerolog/log" +) + +// cleanup repository graveyard +func cleanupRepoGraveyard(ctx context.Context, graveyardpath string) error { + logger := log.Ctx(ctx) + repolist, err := os.ReadDir(graveyardpath) + if err != nil { + logger.Warn().Err(err).Msgf("failed to read repos graveyard directory %s", graveyardpath) + return err + } + for _, repo := range repolist { + // exit early if context is cancelled + if ctx.Err() != nil { + return ctx.Err() + } + if err := os.RemoveAll(path.Join(graveyardpath, repo.Name())); err != nil { + logger.Error().Err(err).Msgf("failed to remove repository %s from graveyard", repo.Name()) + } else { + logger.Info().Msgf("repository %s removed from graveyard", repo.Name()) + } + } + return nil +} + +func AddAllGitRPCCronJobs(cm *CronManager, gitrpcconfig server.Config) error { + // periodic repository graveyard cleanup + graveyardpath := filepath.Join(gitrpcconfig.GitRoot, server.ReposGraveyardSubdirName) + err := cm.NewCronTask(Nightly, func(ctx context.Context) error { return cleanupRepoGraveyard(ctx, graveyardpath) }) + if err != nil { + return err + } + return nil +} diff --git a/gitrpc/server/cron/clean_slate_data_test.go b/gitrpc/server/cron/clean_slate_data_test.go new file mode 100644 index 000000000..a76dacc3a --- /dev/null +++ b/gitrpc/server/cron/clean_slate_data_test.go @@ -0,0 +1,31 @@ +package cron + +import ( + "context" + "io/ioutil" + "os" + "testing" +) + +func TestCleanupRepoGraveyardFunc(t *testing.T) { + ctx := context.Background() + tmpDir := t.TempDir() + //create a dummy repository + testRepo, _ := ioutil.TempDir(tmpDir, "TestRepo100") + err := cleanupRepoGraveyard(ctx, tmpDir) + if err != nil { + t.Error("cleanupRepoGraveyard failed") + } + if _, err := os.Stat(testRepo); !os.IsNotExist(err) { + t.Error("cleanupRepoGraveyard failed to remove the directory") + } +} + +func TestCleanupRepoGraveyardEmpty(t *testing.T) { + ctx := context.Background() + tmpDir := t.TempDir() + err := cleanupRepoGraveyard(ctx, tmpDir) + if err != nil { + t.Error("cleanupRepoGraveyard failed") + } +} diff --git a/gitrpc/server/cron/cronmanager.go b/gitrpc/server/cron/cronmanager.go new file mode 100644 index 000000000..4bd76dbf7 --- /dev/null +++ b/gitrpc/server/cron/cronmanager.go @@ -0,0 +1,80 @@ +package cron + +import ( + "context" + "errors" + "fmt" + + cron "github.com/robfig/cron/v3" + "github.com/rs/zerolog/log" +) + +const ( + //Format: seconds minute(0-59) hour(0-23) day of month(1-31) month(1-12) day of week(0-6) + Hourly = "0 0 * * * *" // once an hour at minute 0 + Nightly = "0 0 0 * * *" // once a day at midnight + Weekly = "0 0 0 * * 0" // once a week on Sun midnight + Monthly = "0 0 0 1 * *" // once a month on the first day of the month + EverySecond = "* * * * * *" // every second (for testing) +) + +var ErrFatal = errors.New("fatal error occured") + +type CronManager struct { + c *cron.Cron + ctx context.Context + cancel context.CancelFunc + fatal chan error +} + +// options could be location, logger, etc. +func NewCronManager() *CronManager { + return &CronManager{ + c: cron.New(cron.WithSeconds()), + fatal: make(chan error), + } +} + +// add a new func to cron job +func (c *CronManager) NewCronTask(sepc string, job func(ctx context.Context) error) error { + _, err := c.c.AddFunc(sepc, func() { + jerr := job(c.ctx) + if jerr != nil { // check different severity of errors + log.Ctx(c.ctx).Error().Err(jerr).Msg("gitrpc cron job failed") + + if errors.Is(jerr, ErrFatal) { + c.fatal <- jerr + return + } + } + }) + if err != nil { + return fmt.Errorf("gitrpc cron manager failed to add cron job function: %w", err) + } + return nil +} + +// Run the cron scheduler, or no-op if already running. +func (c *CronManager) Run(ctx context.Context) error { + c.ctx, c.cancel = context.WithCancel(ctx) + var err error + go func() { + select { + case <-ctx.Done(): + err = fmt.Errorf("context done: %w", ctx.Err()) + case fErr := <-c.fatal: + err = fmt.Errorf("fatal error occured: %w", fErr) + } + + // stop scheduling of new jobs. + // NOTE: doesn't wait for running jobs, but c.Run() does, and we don't have to wait here + _ = c.c.Stop() + + // cancel running jobs (redundant for ctx.Done(), but makes code simpler) + c.cancel() + }() + + c.c.Run() + close(c.fatal) + return err +} diff --git a/gitrpc/server/cron/cronmanager_test.go b/gitrpc/server/cron/cronmanager_test.go new file mode 100644 index 000000000..c93d40f14 --- /dev/null +++ b/gitrpc/server/cron/cronmanager_test.go @@ -0,0 +1,104 @@ +package cron + +import ( + "context" + "errors" + "fmt" + "testing" + "time" +) + +func run(cmngr *CronManager, ctx context.Context) chan error { + cron := make(chan error) + go func() { + cron <- cmngr.Run(ctx) + }() + return cron +} + +func TestCronManagerFatalErr(t *testing.T) { + cmngr := NewCronManager() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + _ = cmngr.NewCronTask(EverySecond, func(ctx context.Context) error { + return fmt.Errorf("inner: %w", ErrFatal) + }) + select { + case ferr := <-run(cmngr, ctx): + if ferr == nil { + t.Error("Cronmanager failed to receive fatal error") + } + case <-time.After(2 * time.Second): + t.Error("Cronmanager failed to stop after a fatal error") + } +} + +func TestCronManagerNonFatalErr(t *testing.T) { + cmngr := NewCronManager() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + _ = cmngr.NewCronTask(EverySecond, func(ctx context.Context) error { + return errors.New("dummy error") + }) + select { + case ferr := <-run(cmngr, ctx): + if ferr != nil { + t.Error("Cronmanager failed at a non fatal error") + } + case <-time.After(1500 * time.Microsecond): + // cron manager should keep running + } + +} +func TestCronManagerNewTask(t *testing.T) { + cmngr := NewCronManager() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + a := 0 + // add a task + _ = cmngr.NewCronTask(EverySecond, func(ctx context.Context) error { + a = 1 + return nil + }) + + select { + case cerr := <-run(cmngr, ctx): + if cerr != nil { + t.Error("Cronmanager failed at Run:", cerr) + } + case <-time.After(1500 * time.Millisecond): + if a != 1 { + t.Error("Cronmanager failed to run the task") + } + } +} + +func TestCronManagerStopOnCtxCancel(t *testing.T) { + cmngr := NewCronManager() + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + _ = cmngr.NewCronTask(EverySecond, func(ctx context.Context) error { + cancel() + return nil + }) + err := cmngr.Run(ctx) + if !errors.Is(err, context.Canceled) { + t.Error("Cronmanager failed to stop after ctx got canceled ", err) + } +} + +func TestCronManagerStopOnCtxTimeout(t *testing.T) { + cmngr := NewCronManager() + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + _ = cmngr.NewCronTask(EverySecond, func(ctx context.Context) error { + time.Sleep(5 * time.Second) + return nil + }) + err := cmngr.Run(ctx) + if !errors.Is(err, context.DeadlineExceeded) { + t.Error("Cronmanager failed to stop after ctx timeout", err) + } +} diff --git a/gitrpc/server/cron/wire.go b/gitrpc/server/cron/wire.go new file mode 100644 index 000000000..586007702 --- /dev/null +++ b/gitrpc/server/cron/wire.go @@ -0,0 +1,15 @@ +package cron + +import ( + "github.com/google/wire" + "github.com/harness/gitness/gitrpc/server" +) + +// WireSet provides a wire set for this package. +var WireSet = wire.NewSet(ProvideCronManager) + +func ProvideCronManager(gitrpcconfig server.Config) *CronManager { + cmngr := NewCronManager() + _ = AddAllGitRPCCronJobs(cmngr, gitrpcconfig) + return cmngr +} diff --git a/gitrpc/server/server.go b/gitrpc/server/server.go index 17b0184bf..b7bb8fde3 100644 --- a/gitrpc/server/server.go +++ b/gitrpc/server/server.go @@ -25,7 +25,7 @@ import ( const ( repoSubdirName = "repos" - reposGraveyardSubdirName = "cleanup" + ReposGraveyardSubdirName = "cleanup" ) type Server struct { @@ -71,7 +71,7 @@ func NewServer(config Config) (*Server, error) { store := storage.NewLocalStore() // create a temp dir for deleted repositories // this dir should get cleaned up peridocally if it's not empty - reposGraveyard := filepath.Join(config.GitRoot, reposGraveyardSubdirName) + reposGraveyard := filepath.Join(config.GitRoot, ReposGraveyardSubdirName) if _, errdir := os.Stat(reposGraveyard); os.IsNotExist(errdir) { if errdir = os.MkdirAll(reposGraveyard, 0o700); errdir != nil { return nil, errdir diff --git a/go.mod b/go.mod index ca32ae73c..14814b923 100644 --- a/go.mod +++ b/go.mod @@ -93,6 +93,15 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/pquerna/otp v1.3.0 // indirect + github.com/prometheus/client_golang v1.12.1 // indirect + github.com/prometheus/client_model v0.2.0 // indirect + github.com/prometheus/common v0.32.1 // indirect + github.com/prometheus/procfs v0.7.3 // indirect + github.com/rivo/uniseg v0.2.0 // indirect + github.com/robfig/cron/v3 v3.0.0 // indirect + github.com/russross/blackfriday/v2 v2.1.0 // indirect + github.com/santhosh-tekuri/jsonschema/v5 v5.0.0 // indirect github.com/sergi/go-diff v1.2.0 // indirect github.com/swaggest/jsonschema-go v0.3.40 // indirect github.com/swaggest/refl v1.1.0 // indirect diff --git a/go.sum b/go.sum index 547fc84c6..693bee7dc 100644 --- a/go.sum +++ b/go.sum @@ -520,6 +520,13 @@ github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 h1:OdAsTTz6OkFY5QxjkYwrChwuRruF69c169dPK26NUlk= +github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/robfig/cron/v3 v3.0.0 h1:kQ6Cb7aHOHTSzNVNEhmp8EcWKLb4CbiMW9h9VyIhO4E= +github.com/robfig/cron/v3 v3.0.0/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= diff --git a/mocks/mock_client.go b/mocks/mock_client.go index fbd985bd4..3216e04cb 100644 --- a/mocks/mock_client.go +++ b/mocks/mock_client.go @@ -8,10 +8,9 @@ import ( context "context" reflect "reflect" + gomock "github.com/golang/mock/gomock" user "github.com/harness/gitness/internal/api/controller/user" types "github.com/harness/gitness/types" - - gomock "github.com/golang/mock/gomock" ) // MockClient is a mock of Client interface. diff --git a/mocks/mock_store.go b/mocks/mock_store.go index 8ad9973bf..30b7ec550 100644 --- a/mocks/mock_store.go +++ b/mocks/mock_store.go @@ -8,10 +8,9 @@ import ( context "context" reflect "reflect" + gomock "github.com/golang/mock/gomock" types "github.com/harness/gitness/types" enum "github.com/harness/gitness/types/enum" - - gomock "github.com/golang/mock/gomock" ) // MockPrincipalStore is a mock of PrincipalStore interface.