address comments

This commit is contained in:
Vistaar Juneja 2023-08-16 20:33:32 +01:00
parent 13ae2a0361
commit 1f06534259
18 changed files with 48 additions and 53 deletions

View File

@ -38,12 +38,13 @@ func (c *Controller) Find(
execution, err := c.executionStore.Find(ctx, pipeline.ID, executionNum) execution, err := c.executionStore.Find(ctx, pipeline.ID, executionNum)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not find execution: %w", err) return nil, fmt.Errorf("could not find execution %d: %w", executionNum, err)
} }
stages, err := c.stageStore.ListSteps(ctx, execution.ID) stages, err := c.stageStore.ListWithSteps(ctx, execution.ID)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not query stage information: %w", err) return nil, fmt.Errorf("could not query stage information for execution %d: %w",
executionNum, err)
} }
// Add stages information to the execution // Add stages information to the execution

View File

@ -43,12 +43,12 @@ func (c *Controller) Find(
return nil, fmt.Errorf("could not find execution: %w", err) return nil, fmt.Errorf("could not find execution: %w", err)
} }
stage, err := c.stageStore.FindNumber(ctx, execution.ID, stageNum) stage, err := c.stageStore.FindByNumber(ctx, execution.ID, stageNum)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not find stage: %w", err) return nil, fmt.Errorf("could not find stage: %w", err)
} }
step, err := c.stepStore.FindNumber(ctx, stage.ID, stepNum) step, err := c.stepStore.FindByNumber(ctx, stage.ID, stepNum)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not find step: %w", err) return nil, fmt.Errorf("could not find step: %w", err)
} }

View File

@ -43,12 +43,12 @@ func (c *Controller) Tail(
return nil, nil, fmt.Errorf("could not find execution: %w", err) return nil, nil, fmt.Errorf("could not find execution: %w", err)
} }
stage, err := c.stageStore.FindNumber(ctx, execution.ID, stageNum) stage, err := c.stageStore.FindByNumber(ctx, execution.ID, stageNum)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("could not find stage: %w", err) return nil, nil, fmt.Errorf("could not find stage: %w", err)
} }
step, err := c.stepStore.FindNumber(ctx, stage.ID, stepNum) step, err := c.stepStore.FindByNumber(ctx, stage.ID, stepNum)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("could not find step: %w", err) return nil, nil, fmt.Errorf("could not find step: %w", err)
} }

View File

@ -51,9 +51,9 @@ func HandleFind(logCtrl *logs.Controller) http.HandlerFunc {
render.TranslatedUserError(w, err) render.TranslatedUserError(w, err)
return return
} }
defer rc.Close()
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
io.Copy(w, rc) io.Copy(w, rc)
rc.Close()
} }
} }

View File

@ -15,6 +15,7 @@ import (
"github.com/harness/gitness/internal/api/render" "github.com/harness/gitness/internal/api/render"
"github.com/harness/gitness/internal/api/request" "github.com/harness/gitness/internal/api/request"
"github.com/harness/gitness/internal/paths" "github.com/harness/gitness/internal/paths"
"github.com/rs/zerolog/log"
) )
var ( var (
@ -54,6 +55,8 @@ func HandleTail(logCtrl *logs.Controller) http.HandlerFunc {
f, ok := w.(http.Flusher) f, ok := w.(http.Flusher)
if !ok { if !ok {
log.Error().Msg("http writer type assertion failed")
render.InternalError(w)
return return
} }
@ -81,12 +84,11 @@ func HandleTail(logCtrl *logs.Controller) http.HandlerFunc {
h.Set("X-Accel-Buffering", "no") h.Set("X-Accel-Buffering", "no")
h.Set("Access-Control-Allow-Origin", "*") h.Set("Access-Control-Allow-Origin", "*")
ctx, cancel := context.WithCancel(r.Context()) ctx, cancel := context.WithTimeout(r.Context(), tailMaxTime)
defer cancel() defer cancel()
enc := json.NewEncoder(w) enc := json.NewEncoder(w)
tailMaxTimeTimer := time.After(tailMaxTime)
msgDelayTimer := time.NewTimer(pingInterval) // if time b/w messages takes longer, send a ping msgDelayTimer := time.NewTimer(pingInterval) // if time b/w messages takes longer, send a ping
defer msgDelayTimer.Stop() defer msgDelayTimer.Stop()
L: L:
@ -97,8 +99,6 @@ func HandleTail(logCtrl *logs.Controller) http.HandlerFunc {
break L break L
case <-errc: case <-errc:
break L break L
case <-tailMaxTimeTimer:
break L
case <-msgDelayTimer.C: case <-msgDelayTimer.C:
io.WriteString(w, ": ping\n\n") io.WriteString(w, ": ping\n\n")
f.Flush() f.Flush()

View File

@ -532,19 +532,19 @@ type (
// where the stage is incomplete (pending or running). // where the stage is incomplete (pending or running).
ListIncomplete(ctx context.Context) ([]*types.Stage, error) ListIncomplete(ctx context.Context) ([]*types.Stage, error)
// ListSteps returns a stage list from the datastore corresponding to an execution, // ListWithSteps returns a stage list from the datastore corresponding to an execution,
// with the individual steps included. // with the individual steps included.
ListSteps(ctx context.Context, executionID int64) ([]*types.Stage, error) ListWithSteps(ctx context.Context, executionID int64) ([]*types.Stage, error)
// Find returns a build stage from the datastore by ID. // Find returns a build stage from the datastore by ID.
Find(ctx context.Context, stageID int64) (*types.Stage, error) Find(ctx context.Context, stageID int64) (*types.Stage, error)
// FindNumber returns a stage from the datastore by number. // FindByNumber returns a stage from the datastore by number.
FindNumber(ctx context.Context, executionID int64, stageNum int) (*types.Stage, error) FindByNumber(ctx context.Context, executionID int64, stageNum int) (*types.Stage, error)
} }
StepStore interface { StepStore interface {
// FindNumber returns a step from the datastore by number. // FindByNumber returns a step from the datastore by number.
FindNumber(ctx context.Context, stageID int64, stepNum int) (*types.Step, error) FindByNumber(ctx context.Context, stageID int64, stepNum int) (*types.Step, error)
} }
) )

View File

@ -10,12 +10,12 @@ import (
sqlx "github.com/jmoiron/sqlx/types" sqlx "github.com/jmoiron/sqlx/types"
) )
// EncodeToJSON accepts a generic parameter and returns // EncodeToSQLXJSON accepts a generic parameter and returns
// a sqlx.JSONText object which is used to store arbitrary // a sqlx.JSONText object which is used to store arbitrary
// data in the DB. We absorb the error here as the value // data in the DB. We absorb the error here as the value
// gets absorbed in sqlx.JSONText in case of UnsupportedValueError // gets absorbed in sqlx.JSONText in case of UnsupportedValueError
// or UnsupportedTypeError. // or UnsupportedTypeError.
func EncodeToJSON(v any) sqlx.JSONText { func EncodeToSQLXJSON(v any) sqlx.JSONText {
raw, _ := json.Marshal(v) raw, _ := json.Marshal(v)
return sqlx.JSONText(raw) return sqlx.JSONText(raw)
} }

View File

@ -258,7 +258,7 @@ func (s *executionStore) Update(ctx context.Context, e *types.Execution) error {
m, err := mapInternalToExecution(execution) m, err := mapInternalToExecution(execution)
if err != nil { if err != nil {
return database.ProcessSQLErrorf(err, "Could not map execution object") return fmt.Errorf("Could not map execution object: %w", err)
} }
*e = *m *e = *m
e.Version = execution.Version e.Version = execution.Version

View File

@ -78,7 +78,7 @@ func mapExecutionToInternal(in *types.Execution) *execution {
AuthorEmail: in.AuthorEmail, AuthorEmail: in.AuthorEmail,
AuthorAvatar: in.AuthorAvatar, AuthorAvatar: in.AuthorAvatar,
Sender: in.Sender, Sender: in.Sender,
Params: EncodeToJSON(in.Params), Params: EncodeToSQLXJSON(in.Params),
Cron: in.Cron, Cron: in.Cron,
Deploy: in.Deploy, Deploy: in.Deploy,
DeployID: in.DeployID, DeployID: in.DeployID,

View File

@ -3,7 +3,7 @@ DROP TABLE executions;
DROP TABLE stages; DROP TABLE stages;
DROP TABLE steps; DROP TABLE steps;
DROP TABLE logs; DROP TABLE logs;
CREATE TABLE IF NOT EXISTS pipelines ( CREATE TABLE pipelines (
pipeline_id INTEGER PRIMARY KEY AUTOINCREMENT pipeline_id INTEGER PRIMARY KEY AUTOINCREMENT
,pipeline_description TEXT NOT NULL ,pipeline_description TEXT NOT NULL
,pipeline_space_id INTEGER NOT NULL ,pipeline_space_id INTEGER NOT NULL
@ -34,7 +34,7 @@ CREATE TABLE IF NOT EXISTS pipelines (
ON DELETE CASCADE ON DELETE CASCADE
); );
CREATE TABLE IF NOT EXISTS executions ( CREATE TABLE executions (
execution_id INTEGER PRIMARY KEY AUTOINCREMENT execution_id INTEGER PRIMARY KEY AUTOINCREMENT
,execution_pipeline_id INTEGER NOT NULL ,execution_pipeline_id INTEGER NOT NULL
,execution_repo_id INTEGER NOT NULL ,execution_repo_id INTEGER NOT NULL
@ -87,7 +87,7 @@ CREATE TABLE IF NOT EXISTS executions (
ON DELETE CASCADE ON DELETE CASCADE
); );
CREATE TABLE IF NOT EXISTS secrets ( CREATE TABLE secrets (
secret_id INTEGER PRIMARY KEY AUTOINCREMENT secret_id INTEGER PRIMARY KEY AUTOINCREMENT
,secret_uid TEXT NOT NULL ,secret_uid TEXT NOT NULL
,secret_space_id INTEGER NOT NULL ,secret_space_id INTEGER NOT NULL
@ -107,7 +107,7 @@ CREATE TABLE IF NOT EXISTS secrets (
ON DELETE CASCADE ON DELETE CASCADE
); );
CREATE TABLE IF NOT EXISTS stages ( CREATE TABLE stages (
stage_id INTEGER PRIMARY KEY AUTOINCREMENT stage_id INTEGER PRIMARY KEY AUTOINCREMENT
,stage_execution_id INTEGER NOT NULL ,stage_execution_id INTEGER NOT NULL
,stage_number INTEGER NOT NULL ,stage_number INTEGER NOT NULL
@ -146,17 +146,13 @@ CREATE TABLE IF NOT EXISTS stages (
ON DELETE CASCADE ON DELETE CASCADE
); );
-- name: create-index-stages-build
CREATE INDEX IF NOT EXISTS ix_stages_build ON stages (stage_execution_id);
-- name: create-index-stages-status -- name: create-index-stages-status
CREATE INDEX IF NOT EXISTS ix_stage_in_progress ON stages (stage_status) CREATE INDEX ix_stage_in_progress ON stages (stage_status)
WHERE stage_status IN ('pending', 'running'); WHERE stage_status IN ('pending', 'running');
CREATE TABLE IF NOT EXISTS steps ( CREATE TABLE steps (
step_id INTEGER PRIMARY KEY AUTOINCREMENT step_id INTEGER PRIMARY KEY AUTOINCREMENT
,step_stage_id INTEGER NOT NULL ,step_stage_id INTEGER NOT NULL
,step_number INTEGER NOT NULL ,step_number INTEGER NOT NULL
@ -185,7 +181,7 @@ CREATE TABLE IF NOT EXISTS steps (
); );
CREATE TABLE IF NOT EXISTS logs ( CREATE TABLE logs (
log_id INTEGER PRIMARY KEY log_id INTEGER PRIMARY KEY
,log_data BLOB NOT NULL ,log_data BLOB NOT NULL

View File

@ -89,8 +89,8 @@ type stageStore struct {
db *sqlx.DB db *sqlx.DB
} }
// FindNumbers returns a stage given an execution ID and a stage number. // FindByNumber returns a stage given an execution ID and a stage number.
func (s *stageStore) FindNumber(ctx context.Context, executionID int64, stageNum int) (*types.Stage, error) { func (s *stageStore) FindByNumber(ctx context.Context, executionID int64, stageNum int) (*types.Stage, error) {
const findQueryStmt = ` const findQueryStmt = `
SELECT` + stageColumns + ` SELECT` + stageColumns + `
FROM stages FROM stages
@ -104,8 +104,8 @@ func (s *stageStore) FindNumber(ctx context.Context, executionID int64, stageNum
return mapInternalToStage(dst) return mapInternalToStage(dst)
} }
// ListSteps returns a stage with information about all its containing steps. // ListWithSteps returns a stage with information about all its containing steps.
func (s *stageStore) ListSteps(ctx context.Context, executionID int64) ([]*types.Stage, error) { func (s *stageStore) ListWithSteps(ctx context.Context, executionID int64) ([]*types.Stage, error) {
const queryNumberWithSteps = ` const queryNumberWithSteps = `
SELECT` + stageColumns + "," + stepColumns + ` SELECT` + stageColumns + "," + stepColumns + `
FROM stages FROM stages
@ -142,8 +142,6 @@ func (s *stageStore) Find(ctx context.Context, stageID int64) (*types.Stage, err
} }
// ListIncomplete returns a list of stages with a pending status // ListIncomplete returns a list of stages with a pending status
// TODO: Check whether mysql needs a separate syntax
// ref: https://github.com/harness/drone/blob/master/store/stage/stage.go#L110.
func (s *stageStore) ListIncomplete(ctx context.Context) ([]*types.Stage, error) { func (s *stageStore) ListIncomplete(ctx context.Context) ([]*types.Stage, error) {
const queryListIncomplete = ` const queryListIncomplete = `
SELECT` + stageColumns + ` SELECT` + stageColumns + `
@ -155,7 +153,7 @@ func (s *stageStore) ListIncomplete(ctx context.Context) ([]*types.Stage, error)
dst := []*stage{} dst := []*stage{}
if err := db.GetContext(ctx, dst, queryListIncomplete); err != nil { if err := db.GetContext(ctx, dst, queryListIncomplete); err != nil {
return nil, database.ProcessSQLErrorf(err, "Failed to find stage") return nil, database.ProcessSQLErrorf(err, "Failed to find incomplete stages")
} }
// map stages list // map stages list
return mapInternalToStageList(dst) return mapInternalToStageList(dst)

View File

@ -81,8 +81,8 @@ func mapStageToInternal(in *types.Stage) *stage {
Version: in.Version, Version: in.Version,
OnSuccess: in.OnSuccess, OnSuccess: in.OnSuccess,
OnFailure: in.OnFailure, OnFailure: in.OnFailure,
DependsOn: EncodeToJSON(in.DependsOn), DependsOn: EncodeToSQLXJSON(in.DependsOn),
Labels: EncodeToJSON(in.Labels), Labels: EncodeToSQLXJSON(in.Labels),
} }
} }

View File

@ -67,8 +67,8 @@ type stepStore struct {
db *sqlx.DB db *sqlx.DB
} }
// FindNumber returns a step given a stage ID and a step number. // FindByNumber returns a step given a stage ID and a step number.
func (s *stepStore) FindNumber(ctx context.Context, stageID int64, stepNum int) (*types.Step, error) { func (s *stepStore) FindByNumber(ctx context.Context, stageID int64, stepNum int) (*types.Step, error) {
const findQueryStmt = ` const findQueryStmt = `
SELECT` + stepColumns + ` SELECT` + stepColumns + `
FROM steps FROM steps

View File

@ -49,7 +49,7 @@ func mapStepToInternal(in *types.Step) *step {
Started: in.Started, Started: in.Started,
Stopped: in.Stopped, Stopped: in.Stopped,
Version: in.Version, Version: in.Version,
DependsOn: EncodeToJSON(in.DependsOn), DependsOn: EncodeToSQLXJSON(in.DependsOn),
Image: in.Image, Image: in.Image,
Detached: in.Detached, Detached: in.Detached,
Schema: in.Schema, Schema: in.Schema,

View File

@ -7,6 +7,7 @@ package logs
import ( import (
"bytes" "bytes"
"context" "context"
"fmt"
"io" "io"
"github.com/harness/gitness/internal/store" "github.com/harness/gitness/internal/store"
@ -14,7 +15,6 @@ import (
"github.com/harness/gitness/store/database/dbtx" "github.com/harness/gitness/store/database/dbtx"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
"github.com/pkg/errors"
) )
var _ store.LogStore = (*logStore)(nil) var _ store.LogStore = (*logStore)(nil)
@ -66,7 +66,7 @@ func (s *logStore) Create(ctx context.Context, stepID int64, r io.Reader) error
,:log_data` ,:log_data`
data, err := io.ReadAll(r) data, err := io.ReadAll(r)
if err != nil { if err != nil {
return errors.Wrap(err, "could not read log data") return fmt.Errorf("could not read log data: %w", err)
} }
params := &logs{ params := &logs{
ID: stepID, ID: stepID,
@ -80,7 +80,7 @@ func (s *logStore) Create(ctx context.Context, stepID int64, r io.Reader) error
return database.ProcessSQLErrorf(err, "Failed to bind log object") return database.ProcessSQLErrorf(err, "Failed to bind log object")
} }
if err = db.QueryRowContext(ctx, query, arg...).Scan(&params.ID); err != nil { if _, err := db.ExecContext(ctx, query, arg...); err != nil {
return database.ProcessSQLErrorf(err, "log query failed") return database.ProcessSQLErrorf(err, "log query failed")
} }
@ -96,7 +96,7 @@ func (s *logStore) Update(ctx context.Context, stepID int64, r io.Reader) error
WHERE log_id = :log_id` WHERE log_id = :log_id`
data, err := io.ReadAll(r) data, err := io.ReadAll(r)
if err != nil { if err != nil {
return errors.Wrap(err, "could not read log data") return fmt.Errorf("could not read log data: %w", err)
} }
db := dbtx.GetAccessor(ctx, s.db) db := dbtx.GetAccessor(ctx, s.db)

View File

@ -90,7 +90,7 @@ type Config struct {
Expire time.Duration `envconfig:"GITNESS_TOKEN_EXPIRE" default:"720h"` Expire time.Duration `envconfig:"GITNESS_TOKEN_EXPIRE" default:"720h"`
} }
// S3 provides optional storage option for logs // S3 provides optional storage option for logs.
S3 struct { S3 struct {
Bucket string `envconfig:"GITNESS_S3_BUCKET"` Bucket string `envconfig:"GITNESS_S3_BUCKET"`
Prefix string `envconfig:"GITNESS_S3_PREFIX"` Prefix string `envconfig:"GITNESS_S3_PREFIX"`

View File

@ -38,8 +38,8 @@ type Execution struct {
Debug bool `json:"debug,omitempty"` Debug bool `json:"debug,omitempty"`
Started int64 `json:"started,omitempty"` Started int64 `json:"started,omitempty"`
Finished int64 `json:"finished,omitempty"` Finished int64 `json:"finished,omitempty"`
Created int64 `json:"-"` Created int64 `json:"created"`
Updated int64 `json:"-"` Updated int64 `json:"updated"`
Version int64 `json:"-"` Version int64 `json:"-"`
Stages []*Stage `json:"stages,omitempty"` Stages []*Stage `json:"stages,omitempty"`
} }

View File

@ -11,7 +11,7 @@ type Secret struct {
UID string `db:"secret_uid" json:"uid"` UID string `db:"secret_uid" json:"uid"`
Data string `db:"secret_data" json:"-"` Data string `db:"secret_data" json:"-"`
Created int64 `db:"secret_created" json:"created"` Created int64 `db:"secret_created" json:"created"`
Updated int64 `db:"secret_updated" json:"-"` Updated int64 `db:"secret_updated" json:"updated"`
Version int64 `db:"secret_version" json:"-"` Version int64 `db:"secret_version" json:"-"`
} }