mirror of
https://github.com/harness/drone.git
synced 2025-05-20 10:59:56 +08:00
Add context to pingDatabase and change logging behavior (#21)
Add cancelable context to our server CLI and adds canceling and improved logging to PingDatabase
This commit is contained in:
parent
d0cba5509e
commit
5baf42d5ca
@ -6,24 +6,38 @@ package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/harness/gitness/types"
|
||||
"github.com/harness/gitness/version"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/joho/godotenv"
|
||||
"github.com/mattn/go-isatty"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/rs/zerolog/log"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"gopkg.in/alecthomas/kingpin.v2"
|
||||
)
|
||||
|
||||
const (
|
||||
// GraceFullShutdownTime defines the max time we wait when shutting down a server.
|
||||
// 5min should be enough for most git clones to complete.
|
||||
GraceFullShutdownTime = 300 * time.Second
|
||||
)
|
||||
|
||||
type command struct {
|
||||
envfile string
|
||||
}
|
||||
|
||||
func (c *command) run(*kingpin.ParseContext) error {
|
||||
// Create context that listens for the interrupt signal from the OS.
|
||||
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
||||
defer stop()
|
||||
|
||||
// load environment variables from file.
|
||||
err := godotenv.Load(c.envfile)
|
||||
if err != nil {
|
||||
@ -34,40 +48,57 @@ func (c *command) run(*kingpin.ParseContext) error {
|
||||
// data from the environment.
|
||||
config, err := load()
|
||||
if err != nil {
|
||||
log.Fatal().Err(err).
|
||||
Msg("cannot load configuration")
|
||||
return fmt.Errorf("encountered an error while loading configuration: %w", err)
|
||||
}
|
||||
|
||||
// configure the log level
|
||||
setupLogger(config)
|
||||
|
||||
system, err := initSystem(config)
|
||||
// initialize system
|
||||
system, err := initSystem(ctx, config)
|
||||
if err != nil {
|
||||
log.Fatal().Err(err).
|
||||
Msg("cannot boot server")
|
||||
return fmt.Errorf("encountered an error while initializing the system: %w", err)
|
||||
}
|
||||
|
||||
var g errgroup.Group
|
||||
// collects all go routines - gCTX cancels if any go routine encounters an error
|
||||
g, gCtx := errgroup.WithContext(ctx)
|
||||
|
||||
// starts the http server.
|
||||
g.Go(func() error {
|
||||
// start server
|
||||
gHTTP, shutdownHTTP := system.server.ListenAndServe()
|
||||
g.Go(gHTTP.Wait)
|
||||
log.Info().
|
||||
Str("port", config.Server.Bind).
|
||||
Str("revision", version.GitCommit).
|
||||
Str("repository", version.GitRepository).
|
||||
Stringer("version", version.Version).
|
||||
Msg("server started")
|
||||
return system.server.ListenAndServe(context.Background())
|
||||
})
|
||||
|
||||
// start the purge routine.
|
||||
g.Go(func() error {
|
||||
log.Debug().Msg("starting the nightly subroutine")
|
||||
system.nightly.Run(context.Background())
|
||||
system.nightly.Run(gCtx)
|
||||
return nil
|
||||
})
|
||||
log.Info().Msg("nightly subroutine started")
|
||||
|
||||
return g.Wait()
|
||||
// wait until the error group context is done
|
||||
<-gCtx.Done()
|
||||
|
||||
// restore default behavior on the interrupt signal and notify user of shutdown.
|
||||
stop()
|
||||
log.Info().Msg("shutting down gracefully (press Ctrl+C again to force)")
|
||||
|
||||
// shutdown servers gracefully
|
||||
shutdownCtx, cancel := context.WithTimeout(context.Background(), GraceFullShutdownTime)
|
||||
defer cancel()
|
||||
|
||||
if sErr := shutdownHTTP(shutdownCtx); sErr != nil {
|
||||
log.Err(sErr).Msg("failed to shutdown http server gracefully")
|
||||
}
|
||||
|
||||
log.Info().Msg("wait for subroutines to complete")
|
||||
err = g.Wait()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// helper function configures the global logger from
|
||||
|
@ -8,6 +8,8 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/harness/gitness/internal/auth/authn"
|
||||
"github.com/harness/gitness/internal/auth/authz"
|
||||
"github.com/harness/gitness/internal/cron"
|
||||
@ -20,7 +22,7 @@ import (
|
||||
"github.com/google/wire"
|
||||
)
|
||||
|
||||
func initSystem(config *types.Config) (*system, error) {
|
||||
func initSystem(ctx context.Context, config *types.Config) (*system, error) {
|
||||
wire.Build(
|
||||
database.WireSet,
|
||||
memory.WireSet,
|
||||
|
@ -7,6 +7,8 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/harness/gitness/internal/auth/authn"
|
||||
"github.com/harness/gitness/internal/auth/authz"
|
||||
"github.com/harness/gitness/internal/cron"
|
||||
@ -19,9 +21,9 @@ import (
|
||||
|
||||
// Injectors from wire.go:
|
||||
|
||||
func initSystem(config *types.Config) (*system, error) {
|
||||
func initSystem(ctx context.Context, config *types.Config) (*system, error) {
|
||||
systemStore := memory.New(config)
|
||||
db, err := database.ProvideDatabase(config)
|
||||
db, err := database.ProvideDatabase(ctx, config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -15,6 +15,11 @@ import (
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
const (
|
||||
// ReadHeaderTimeout defines the max time the server waits for request headers.
|
||||
ReadHeaderTimeout = 2 * time.Second
|
||||
)
|
||||
|
||||
// A Server defines parameters for running an HTTP server.
|
||||
type Server struct {
|
||||
Acme bool
|
||||
@ -25,43 +30,43 @@ type Server struct {
|
||||
Handler http.Handler
|
||||
}
|
||||
|
||||
// ShutdownFunction defines a function that is called to shutdown the server.
|
||||
type ShutdownFunction func(context.Context) error
|
||||
|
||||
// ListenAndServe initializes a server to respond to HTTP network requests.
|
||||
func (s *Server) ListenAndServe(ctx context.Context) error {
|
||||
func (s *Server) ListenAndServe() (*errgroup.Group, ShutdownFunction) {
|
||||
if s.Acme {
|
||||
return s.listenAndServeAcme(ctx)
|
||||
return s.listenAndServeAcme()
|
||||
} else if s.Key != "" {
|
||||
return s.listenAndServeTLS(ctx)
|
||||
return s.listenAndServeTLS()
|
||||
}
|
||||
return s.listenAndServe(ctx)
|
||||
return s.listenAndServe()
|
||||
}
|
||||
|
||||
func (s *Server) listenAndServe(ctx context.Context) error {
|
||||
func (s *Server) listenAndServe() (*errgroup.Group, ShutdownFunction) {
|
||||
var g errgroup.Group
|
||||
s1 := &http.Server{
|
||||
Addr: s.Addr,
|
||||
ReadHeaderTimeout: 2 * time.Second,
|
||||
ReadHeaderTimeout: ReadHeaderTimeout,
|
||||
Handler: s.Handler,
|
||||
}
|
||||
g.Go(func() error {
|
||||
<-ctx.Done()
|
||||
return s1.Shutdown(ctx)
|
||||
})
|
||||
g.Go(func() error {
|
||||
return s1.ListenAndServe()
|
||||
})
|
||||
return g.Wait()
|
||||
|
||||
return &g, s1.Shutdown
|
||||
}
|
||||
|
||||
func (s *Server) listenAndServeTLS(ctx context.Context) error {
|
||||
func (s *Server) listenAndServeTLS() (*errgroup.Group, ShutdownFunction) {
|
||||
var g errgroup.Group
|
||||
s1 := &http.Server{
|
||||
Addr: ":http",
|
||||
ReadHeaderTimeout: 2 * time.Second,
|
||||
ReadHeaderTimeout: ReadHeaderTimeout,
|
||||
Handler: http.HandlerFunc(redirect),
|
||||
}
|
||||
s2 := &http.Server{
|
||||
Addr: ":https",
|
||||
ReadHeaderTimeout: 2 * time.Second,
|
||||
ReadHeaderTimeout: ReadHeaderTimeout,
|
||||
Handler: s.Handler,
|
||||
}
|
||||
g.Go(func() error {
|
||||
@ -73,20 +78,20 @@ func (s *Server) listenAndServeTLS(ctx context.Context) error {
|
||||
s.Key,
|
||||
)
|
||||
})
|
||||
g.Go(func() error {
|
||||
<-ctx.Done()
|
||||
if err := s1.Shutdown(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s2.Shutdown(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
||||
return &g, func(ctx context.Context) error {
|
||||
var sg errgroup.Group
|
||||
sg.Go(func() error {
|
||||
return s1.Shutdown(ctx)
|
||||
})
|
||||
return g.Wait()
|
||||
sg.Go(func() error {
|
||||
return s2.Shutdown(ctx)
|
||||
})
|
||||
return sg.Wait()
|
||||
}
|
||||
}
|
||||
|
||||
func (s Server) listenAndServeAcme(ctx context.Context) error {
|
||||
func (s Server) listenAndServeAcme() (*errgroup.Group, ShutdownFunction) {
|
||||
var g errgroup.Group
|
||||
m := &autocert.Manager{
|
||||
Cache: autocert.DirCache(".cache"),
|
||||
@ -95,13 +100,13 @@ func (s Server) listenAndServeAcme(ctx context.Context) error {
|
||||
}
|
||||
s1 := &http.Server{
|
||||
Addr: ":http",
|
||||
ReadHeaderTimeout: 2 * time.Second,
|
||||
ReadHeaderTimeout: ReadHeaderTimeout,
|
||||
Handler: m.HTTPHandler(nil),
|
||||
}
|
||||
s2 := &http.Server{
|
||||
Addr: ":https",
|
||||
Handler: s.Handler,
|
||||
ReadHeaderTimeout: 2 * time.Second,
|
||||
ReadHeaderTimeout: ReadHeaderTimeout,
|
||||
TLSConfig: &tls.Config{
|
||||
MinVersion: tls.VersionTLS12,
|
||||
GetCertificate: m.GetCertificate,
|
||||
@ -114,17 +119,17 @@ func (s Server) listenAndServeAcme(ctx context.Context) error {
|
||||
g.Go(func() error {
|
||||
return s2.ListenAndServeTLS("", "")
|
||||
})
|
||||
g.Go(func() error {
|
||||
<-ctx.Done()
|
||||
if err := s1.Shutdown(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s2.Shutdown(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
||||
return &g, func(ctx context.Context) error {
|
||||
var sg errgroup.Group
|
||||
sg.Go(func() error {
|
||||
return s1.Shutdown(ctx)
|
||||
})
|
||||
return g.Wait()
|
||||
sg.Go(func() error {
|
||||
return s2.Shutdown(ctx)
|
||||
})
|
||||
return sg.Wait()
|
||||
}
|
||||
}
|
||||
|
||||
func redirect(w http.ResponseWriter, req *http.Request) {
|
||||
|
@ -15,9 +15,6 @@ import (
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
// noContext is simple background context.
|
||||
var noContext = context.Background()
|
||||
|
||||
//go:embed postgres/*.sql
|
||||
var postgres embed.FS
|
||||
|
||||
@ -25,7 +22,7 @@ var postgres embed.FS
|
||||
var sqlite embed.FS
|
||||
|
||||
// Migrate performs the database migration.
|
||||
func Migrate(db *sqlx.DB) error {
|
||||
func Migrate(ctx context.Context, db *sqlx.DB) error {
|
||||
before := func(_ context.Context, _ *sql.Tx, version string) error {
|
||||
log.Trace().Str("version", version).Msg("migration started")
|
||||
return nil
|
||||
@ -54,5 +51,5 @@ func Migrate(db *sqlx.DB) error {
|
||||
opts.FS = folder
|
||||
}
|
||||
|
||||
return migrate.New(opts).MigrateUp(noContext)
|
||||
return migrate.New(opts).MigrateUp(ctx)
|
||||
}
|
||||
|
@ -7,11 +7,12 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/harness/gitness/internal/store/database/migrate"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/rs/zerolog/log"
|
||||
|
||||
"github.com/Masterminds/squirrel"
|
||||
@ -23,19 +24,19 @@ import (
|
||||
var builder = squirrel.StatementBuilder.PlaceholderFormat(squirrel.Dollar)
|
||||
|
||||
// Connect to a database and verify with a ping.
|
||||
func Connect(driver, datasource string) (*sqlx.DB, error) {
|
||||
func Connect(ctx context.Context, driver string, datasource string) (*sqlx.DB, error) {
|
||||
db, err := sql.Open(driver, datasource)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Failed to open the db")
|
||||
return nil, fmt.Errorf("failed to open the db: %w", err)
|
||||
}
|
||||
|
||||
dbx := sqlx.NewDb(db, driver)
|
||||
if err = pingDatabase(dbx); err != nil {
|
||||
return nil, errors.Wrap(err, "Failed to ping the db")
|
||||
if err = pingDatabase(ctx, dbx); err != nil {
|
||||
return nil, fmt.Errorf("failed to ping the db: %w", err)
|
||||
}
|
||||
|
||||
if err = setupDatabase(dbx); err != nil {
|
||||
return nil, errors.Wrap(err, "Failed to setup the db")
|
||||
if err = setupDatabase(ctx, dbx); err != nil {
|
||||
return nil, fmt.Errorf("failed to setup the db: %w", err)
|
||||
}
|
||||
|
||||
return dbx, nil
|
||||
@ -53,26 +54,26 @@ func Must(db *sqlx.DB, err error) *sqlx.DB {
|
||||
// helper function to ping the database with backoff to ensure
|
||||
// a connection can be established before we proceed with the
|
||||
// database setup and migration.
|
||||
func pingDatabase(db *sqlx.DB) error {
|
||||
func pingDatabase(ctx context.Context, db *sqlx.DB) error {
|
||||
var err error
|
||||
for i := 0; i < 30; i++ {
|
||||
err = db.Ping()
|
||||
for i := 1; i <= 30; i++ {
|
||||
err = db.PingContext(ctx)
|
||||
|
||||
// We can complete on first successful ping
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Err(err).Msgf("Ping attempt #%d failed", i+1)
|
||||
log.Debug().Err(err).Msgf("Ping attempt #%d failed", i)
|
||||
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
return err
|
||||
return fmt.Errorf("all 30 tries failed, last failure: %w", err)
|
||||
}
|
||||
|
||||
// helper function to setup the database by performing automated
|
||||
// database migration steps.
|
||||
func setupDatabase(db *sqlx.DB) error {
|
||||
return migrate.Migrate(db)
|
||||
func setupDatabase(ctx context.Context, db *sqlx.DB) error {
|
||||
return migrate.Migrate(ctx, db)
|
||||
}
|
||||
|
@ -5,6 +5,7 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"os"
|
||||
|
||||
@ -24,7 +25,7 @@ func connect() (*sqlx.DB, error) {
|
||||
driver = os.Getenv("DATABASE_DRIVER")
|
||||
config = os.Getenv("DATABASE_CONFIG")
|
||||
}
|
||||
return Connect(driver, config)
|
||||
return Connect(context.Background(), driver, config)
|
||||
}
|
||||
|
||||
// seed seed the database state.
|
||||
|
@ -5,6 +5,8 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/harness/gitness/internal/store"
|
||||
"github.com/harness/gitness/types"
|
||||
|
||||
@ -27,8 +29,9 @@ var WireSet = wire.NewSet(
|
||||
)
|
||||
|
||||
// ProvideDatabase provides a database connection.
|
||||
func ProvideDatabase(config *types.Config) (*sqlx.DB, error) {
|
||||
func ProvideDatabase(ctx context.Context, config *types.Config) (*sqlx.DB, error) {
|
||||
return Connect(
|
||||
ctx,
|
||||
config.Database.Driver,
|
||||
config.Database.Datasource,
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user