code195 repo delete cleanup, and added a cron manager for cronjobs and unit tests

This commit is contained in:
atefeh 2023-03-30 19:25:55 -07:00
parent 2c40805936
commit 8b335d1573
16 changed files with 321 additions and 37 deletions

View File

@ -13,6 +13,7 @@ import (
"github.com/harness/gitness/events"
"github.com/harness/gitness/gitrpc"
gitrpcserver "github.com/harness/gitness/gitrpc/server"
gitrpccron "github.com/harness/gitness/gitrpc/server/cron"
"github.com/harness/gitness/harness/auth/authn"
"github.com/harness/gitness/harness/auth/authz"
"github.com/harness/gitness/harness/bootstrap"
@ -29,7 +30,6 @@ import (
"github.com/harness/gitness/internal/api/controller/space"
"github.com/harness/gitness/internal/api/controller/user"
controllerwebhook "github.com/harness/gitness/internal/api/controller/webhook"
"github.com/harness/gitness/internal/cron"
gitevents "github.com/harness/gitness/internal/events/git"
pullreqevents "github.com/harness/gitness/internal/events/pullreq"
"github.com/harness/gitness/internal/server"
@ -58,7 +58,6 @@ func initSystem(ctx context.Context, config *gitnesstypes.Config) (*system, erro
services.WireSet,
cache.WireSet,
server.WireSet,
cron.WireSet,
url.WireSet,
space.WireSet,
repo.WireSet,
@ -88,6 +87,7 @@ func initSystem(ctx context.Context, config *gitnesstypes.Config) (*system, erro
lock.WireSet,
pubsub.WireSet,
codecomments.WireSet,
gitrpccron.WireSet,
)
return &system{}, nil
}

View File

@ -7,10 +7,10 @@ package server
import (
"context"
"github.com/harness/gitness/events"
"github.com/harness/gitness/gitrpc"
server2 "github.com/harness/gitness/gitrpc/server"
"github.com/harness/gitness/gitrpc/server/cron"
"github.com/harness/gitness/harness/auth/authn"
"github.com/harness/gitness/harness/auth/authz"
"github.com/harness/gitness/harness/bootstrap"
@ -27,7 +27,6 @@ import (
"github.com/harness/gitness/internal/api/controller/space"
"github.com/harness/gitness/internal/api/controller/user"
webhook2 "github.com/harness/gitness/internal/api/controller/webhook"
"github.com/harness/gitness/internal/cron"
events3 "github.com/harness/gitness/internal/events/git"
events2 "github.com/harness/gitness/internal/events/pullreq"
router2 "github.com/harness/gitness/internal/router"
@ -177,7 +176,7 @@ func initSystem(ctx context.Context, config *types.Config) (*system, error) {
if err != nil {
return nil, err
}
nightly := cron.NewNightly()
cronManager := cron.ProvideCronManager(serverConfig)
repoGitInfoView := database.ProvideRepoGitInfoView(db)
repoGitInfoCache := cache.ProvideRepoGitInfoCache(repoGitInfoView)
pubsubConfig := pubsub.ProvideConfig(config)
@ -187,6 +186,6 @@ func initSystem(ctx context.Context, config *types.Config) (*system, error) {
return nil, err
}
servicesServices := services.ProvideServices(webhookService, pullreqService)
serverSystem := newSystem(bootstrapBootstrap, serverServer, server3, nightly, servicesServices)
serverSystem := newSystem(bootstrapBootstrap, serverServer, server3, cronManager, servicesServices)
return serverSystem, nil
}

View File

@ -83,17 +83,16 @@ func (c *command) run(*kingpin.ParseContext) error {
Stringer("version", version.Version).
Msg("server started")
// start the purge routine.
g.Go(func() error {
system.nightly.Run(gCtx)
return nil
})
log.Info().Msg("nightly subroutine started")
// start grpc server
g.Go(system.gitRPCServer.Start)
log.Info().Msg("gitrpc server started")
// run the gitrpc cron jobs
g.Go(func() error {
return system.gitRPCCronMngr.Run(ctx)
})
log.Info().Msg("gitrpc cron manager subroutine started")
// wait until the error group context is done
<-gCtx.Done()

View File

@ -13,6 +13,7 @@ import (
"github.com/harness/gitness/events"
"github.com/harness/gitness/gitrpc"
gitrpcserver "github.com/harness/gitness/gitrpc/server"
gitrpccron "github.com/harness/gitness/gitrpc/server/cron"
"github.com/harness/gitness/internal/api/controller/githook"
"github.com/harness/gitness/internal/api/controller/principal"
"github.com/harness/gitness/internal/api/controller/pullreq"
@ -25,7 +26,6 @@ import (
"github.com/harness/gitness/internal/auth/authn"
"github.com/harness/gitness/internal/auth/authz"
"github.com/harness/gitness/internal/bootstrap"
"github.com/harness/gitness/internal/cron"
gitevents "github.com/harness/gitness/internal/events/git"
pullreqevents "github.com/harness/gitness/internal/events/pullreq"
"github.com/harness/gitness/internal/router"
@ -57,7 +57,6 @@ func initSystem(ctx context.Context, config *types.Config) (*system, error) {
pullreqservice.WireSet,
services.WireSet,
server.WireSet,
cron.WireSet,
url.WireSet,
space.WireSet,
repo.WireSet,
@ -85,6 +84,7 @@ func initSystem(ctx context.Context, config *types.Config) (*system, error) {
lock.WireSet,
pubsub.WireSet,
codecomments.WireSet,
gitrpccron.WireSet,
)
return &system{}, nil
}

View File

@ -7,10 +7,10 @@ package server
import (
"context"
"github.com/harness/gitness/events"
"github.com/harness/gitness/gitrpc"
server2 "github.com/harness/gitness/gitrpc/server"
"github.com/harness/gitness/gitrpc/server/cron"
"github.com/harness/gitness/internal/api/controller/githook"
"github.com/harness/gitness/internal/api/controller/principal"
"github.com/harness/gitness/internal/api/controller/pullreq"
@ -23,7 +23,6 @@ import (
"github.com/harness/gitness/internal/auth/authn"
"github.com/harness/gitness/internal/auth/authz"
"github.com/harness/gitness/internal/bootstrap"
"github.com/harness/gitness/internal/cron"
events3 "github.com/harness/gitness/internal/events/git"
events2 "github.com/harness/gitness/internal/events/pullreq"
"github.com/harness/gitness/internal/router"
@ -144,7 +143,7 @@ func initSystem(ctx context.Context, config *types.Config) (*system, error) {
if err != nil {
return nil, err
}
nightly := cron.NewNightly()
cronManager := cron.ProvideCronManager(serverConfig)
repoGitInfoView := database.ProvideRepoGitInfoView(db)
repoGitInfoCache := cache.ProvideRepoGitInfoCache(repoGitInfoView)
pubsubConfig := pubsub.ProvideConfig(config)
@ -154,6 +153,6 @@ func initSystem(ctx context.Context, config *types.Config) (*system, error) {
return nil, err
}
servicesServices := services.ProvideServices(webhookService, pullreqService)
serverSystem := newSystem(bootstrapBootstrap, serverServer, server3, nightly, servicesServices)
serverSystem := newSystem(bootstrapBootstrap, serverServer, server3, cronManager, servicesServices)
return serverSystem, nil
}

View File

@ -6,29 +6,29 @@ package server
import (
gitrpcserver "github.com/harness/gitness/gitrpc/server"
gitrpccron "github.com/harness/gitness/gitrpc/server/cron"
"github.com/harness/gitness/internal/bootstrap"
"github.com/harness/gitness/internal/cron"
"github.com/harness/gitness/internal/server"
"github.com/harness/gitness/internal/services"
)
// system stores high level system sub-routines.
type system struct {
bootstrap bootstrap.Bootstrap
server *server.Server
gitRPCServer *gitrpcserver.Server
nightly *cron.Nightly
services services.Services
bootstrap bootstrap.Bootstrap
server *server.Server
gitRPCServer *gitrpcserver.Server
services services.Services
gitRPCCronMngr *gitrpccron.CronManager
}
// newSystem returns a new system structure.
func newSystem(bootstrap bootstrap.Bootstrap, server *server.Server, gitRPCServer *gitrpcserver.Server,
nightly *cron.Nightly, services services.Services) *system {
gitrpccron *gitrpccron.CronManager, services services.Services) *system {
return &system{
bootstrap: bootstrap,
server: server,
gitRPCServer: gitRPCServer,
nightly: nightly,
services: services,
bootstrap: bootstrap,
server: server,
gitRPCServer: gitRPCServer,
services: services,
gitRPCCronMngr: gitrpccron,
}
}

View File

@ -0,0 +1,43 @@
package cron
import (
"context"
"os"
"path"
"path/filepath"
"github.com/harness/gitness/gitrpc/server"
"github.com/rs/zerolog/log"
)
// cleanup repository graveyard
func cleanupRepoGraveyard(ctx context.Context, graveyardpath string) error {
logger := log.Ctx(ctx)
repolist, err := os.ReadDir(graveyardpath)
if err != nil {
logger.Warn().Err(err).Msgf("failed to read repos graveyard directory %s", graveyardpath)
return err
}
for _, repo := range repolist {
// exit early if context is cancelled
if ctx.Err() != nil {
return ctx.Err()
}
if err := os.RemoveAll(path.Join(graveyardpath, repo.Name())); err != nil {
logger.Error().Err(err).Msgf("failed to remove repository %s from graveyard", repo.Name())
} else {
logger.Info().Msgf("repository %s removed from graveyard", repo.Name())
}
}
return nil
}
func AddAllGitRPCCronJobs(cm *CronManager, gitrpcconfig server.Config) error {
// periodic repository graveyard cleanup
graveyardpath := filepath.Join(gitrpcconfig.GitRoot, server.ReposGraveyardSubdirName)
err := cm.NewCronTask(Nightly, func(ctx context.Context) error { return cleanupRepoGraveyard(ctx, graveyardpath) })
if err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,31 @@
package cron
import (
"context"
"io/ioutil"
"os"
"testing"
)
func TestCleanupRepoGraveyardFunc(t *testing.T) {
ctx := context.Background()
tmpDir := t.TempDir()
//create a dummy repository
testRepo, _ := ioutil.TempDir(tmpDir, "TestRepo100")
err := cleanupRepoGraveyard(ctx, tmpDir)
if err != nil {
t.Error("cleanupRepoGraveyard failed")
}
if _, err := os.Stat(testRepo); !os.IsNotExist(err) {
t.Error("cleanupRepoGraveyard failed to remove the directory")
}
}
func TestCleanupRepoGraveyardEmpty(t *testing.T) {
ctx := context.Background()
tmpDir := t.TempDir()
err := cleanupRepoGraveyard(ctx, tmpDir)
if err != nil {
t.Error("cleanupRepoGraveyard failed")
}
}

View File

@ -0,0 +1,80 @@
package cron
import (
"context"
"errors"
"fmt"
cron "github.com/robfig/cron/v3"
"github.com/rs/zerolog/log"
)
const (
//Format: seconds minute(0-59) hour(0-23) day of month(1-31) month(1-12) day of week(0-6)
Hourly = "0 0 * * * *" // once an hour at minute 0
Nightly = "0 0 0 * * *" // once a day at midnight
Weekly = "0 0 0 * * 0" // once a week on Sun midnight
Monthly = "0 0 0 1 * *" // once a month on the first day of the month
EverySecond = "* * * * * *" // every second (for testing)
)
var ErrFatal = errors.New("fatal error occured")
type CronManager struct {
c *cron.Cron
ctx context.Context
cancel context.CancelFunc
fatal chan error
}
// options could be location, logger, etc.
func NewCronManager() *CronManager {
return &CronManager{
c: cron.New(cron.WithSeconds()),
fatal: make(chan error),
}
}
// add a new func to cron job
func (c *CronManager) NewCronTask(sepc string, job func(ctx context.Context) error) error {
_, err := c.c.AddFunc(sepc, func() {
jerr := job(c.ctx)
if jerr != nil { // check different severity of errors
log.Ctx(c.ctx).Error().Err(jerr).Msg("gitrpc cron job failed")
if errors.Is(jerr, ErrFatal) {
c.fatal <- jerr
return
}
}
})
if err != nil {
return fmt.Errorf("gitrpc cron manager failed to add cron job function: %w", err)
}
return nil
}
// Run the cron scheduler, or no-op if already running.
func (c *CronManager) Run(ctx context.Context) error {
c.ctx, c.cancel = context.WithCancel(ctx)
var err error
go func() {
select {
case <-ctx.Done():
err = fmt.Errorf("context done: %w", ctx.Err())
case fErr := <-c.fatal:
err = fmt.Errorf("fatal error occured: %w", fErr)
}
// stop scheduling of new jobs.
// NOTE: doesn't wait for running jobs, but c.Run() does, and we don't have to wait here
_ = c.c.Stop()
// cancel running jobs (redundant for ctx.Done(), but makes code simpler)
c.cancel()
}()
c.c.Run()
close(c.fatal)
return err
}

View File

@ -0,0 +1,104 @@
package cron
import (
"context"
"errors"
"fmt"
"testing"
"time"
)
func run(cmngr *CronManager, ctx context.Context) chan error {
cron := make(chan error)
go func() {
cron <- cmngr.Run(ctx)
}()
return cron
}
func TestCronManagerFatalErr(t *testing.T) {
cmngr := NewCronManager()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_ = cmngr.NewCronTask(EverySecond, func(ctx context.Context) error {
return fmt.Errorf("inner: %w", ErrFatal)
})
select {
case ferr := <-run(cmngr, ctx):
if ferr == nil {
t.Error("Cronmanager failed to receive fatal error")
}
case <-time.After(2 * time.Second):
t.Error("Cronmanager failed to stop after a fatal error")
}
}
func TestCronManagerNonFatalErr(t *testing.T) {
cmngr := NewCronManager()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_ = cmngr.NewCronTask(EverySecond, func(ctx context.Context) error {
return errors.New("dummy error")
})
select {
case ferr := <-run(cmngr, ctx):
if ferr != nil {
t.Error("Cronmanager failed at a non fatal error")
}
case <-time.After(1500 * time.Microsecond):
// cron manager should keep running
}
}
func TestCronManagerNewTask(t *testing.T) {
cmngr := NewCronManager()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
a := 0
// add a task
_ = cmngr.NewCronTask(EverySecond, func(ctx context.Context) error {
a = 1
return nil
})
select {
case cerr := <-run(cmngr, ctx):
if cerr != nil {
t.Error("Cronmanager failed at Run:", cerr)
}
case <-time.After(1500 * time.Millisecond):
if a != 1 {
t.Error("Cronmanager failed to run the task")
}
}
}
func TestCronManagerStopOnCtxCancel(t *testing.T) {
cmngr := NewCronManager()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_ = cmngr.NewCronTask(EverySecond, func(ctx context.Context) error {
cancel()
return nil
})
err := cmngr.Run(ctx)
if !errors.Is(err, context.Canceled) {
t.Error("Cronmanager failed to stop after ctx got canceled ", err)
}
}
func TestCronManagerStopOnCtxTimeout(t *testing.T) {
cmngr := NewCronManager()
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
_ = cmngr.NewCronTask(EverySecond, func(ctx context.Context) error {
time.Sleep(5 * time.Second)
return nil
})
err := cmngr.Run(ctx)
if !errors.Is(err, context.DeadlineExceeded) {
t.Error("Cronmanager failed to stop after ctx timeout", err)
}
}

View File

@ -0,0 +1,15 @@
package cron
import (
"github.com/google/wire"
"github.com/harness/gitness/gitrpc/server"
)
// WireSet provides a wire set for this package.
var WireSet = wire.NewSet(ProvideCronManager)
func ProvideCronManager(gitrpcconfig server.Config) *CronManager {
cmngr := NewCronManager()
_ = AddAllGitRPCCronJobs(cmngr, gitrpcconfig)
return cmngr
}

View File

@ -25,7 +25,7 @@ import (
const (
repoSubdirName = "repos"
reposGraveyardSubdirName = "cleanup"
ReposGraveyardSubdirName = "cleanup"
)
type Server struct {
@ -71,7 +71,7 @@ func NewServer(config Config) (*Server, error) {
store := storage.NewLocalStore()
// create a temp dir for deleted repositories
// this dir should get cleaned up peridocally if it's not empty
reposGraveyard := filepath.Join(config.GitRoot, reposGraveyardSubdirName)
reposGraveyard := filepath.Join(config.GitRoot, ReposGraveyardSubdirName)
if _, errdir := os.Stat(reposGraveyard); os.IsNotExist(errdir) {
if errdir = os.MkdirAll(reposGraveyard, 0o700); errdir != nil {
return nil, errdir

9
go.mod
View File

@ -93,6 +93,15 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/pquerna/otp v1.3.0 // indirect
github.com/prometheus/client_golang v1.12.1 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/robfig/cron/v3 v3.0.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/santhosh-tekuri/jsonschema/v5 v5.0.0 // indirect
github.com/sergi/go-diff v1.2.0 // indirect
github.com/swaggest/jsonschema-go v0.3.40 // indirect
github.com/swaggest/refl v1.1.0 // indirect

7
go.sum
View File

@ -520,6 +520,13 @@ github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 h1:OdAsTTz6OkFY5QxjkYwrChwuRruF69c169dPK26NUlk=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/robfig/cron/v3 v3.0.0 h1:kQ6Cb7aHOHTSzNVNEhmp8EcWKLb4CbiMW9h9VyIhO4E=
github.com/robfig/cron/v3 v3.0.0/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=

View File

@ -8,10 +8,9 @@ import (
context "context"
reflect "reflect"
gomock "github.com/golang/mock/gomock"
user "github.com/harness/gitness/internal/api/controller/user"
types "github.com/harness/gitness/types"
gomock "github.com/golang/mock/gomock"
)
// MockClient is a mock of Client interface.

View File

@ -8,10 +8,9 @@ import (
context "context"
reflect "reflect"
gomock "github.com/golang/mock/gomock"
types "github.com/harness/gitness/types"
enum "github.com/harness/gitness/types/enum"
gomock "github.com/golang/mock/gomock"
)
// MockPrincipalStore is a mock of PrincipalStore interface.