fix sse space events API blocking app shutdown (#774)

This commit is contained in:
Marko Gacesa 2023-11-08 07:24:14 +00:00 committed by Harness
parent d0d21214d4
commit b6ee8b6ba9
10 changed files with 213 additions and 183 deletions

View File

@ -16,122 +16,29 @@ package space
import (
"context"
"encoding/json"
"fmt"
"io"
"time"
apiauth "github.com/harness/gitness/app/api/auth"
"github.com/harness/gitness/app/auth"
gitnessio "github.com/harness/gitness/app/io"
"github.com/harness/gitness/app/sse"
"github.com/harness/gitness/types/enum"
"github.com/rs/zerolog/log"
)
var (
pingInterval = 30 * time.Second
tailMaxTime = 2 * time.Hour
)
//nolint:gocognit // refactor if needed
func (c *Controller) Events(
ctx context.Context,
session *auth.Session,
spaceRef string,
w gitnessio.WriterFlusher,
) error {
) (<-chan *sse.Event, <-chan error, func(context.Context) error, error) {
space, err := c.spaceStore.FindByRef(ctx, spaceRef)
if err != nil {
return fmt.Errorf("failed to find space ref: %w", err)
return nil, nil, nil, fmt.Errorf("failed to find space ref: %w", err)
}
if err = apiauth.CheckSpace(ctx, c.authorizer, session, space, enum.PermissionSpaceView, true); err != nil {
return fmt.Errorf("failed to authorize stream: %w", err)
return nil, nil, nil, fmt.Errorf("failed to authorize stream: %w", err)
}
ctx, ctxCancel := context.WithTimeout(ctx, tailMaxTime)
defer ctxCancel()
chEvents, chErr, sseCancel := c.sseStreamer.Stream(ctx, space.ID)
_, err = io.WriteString(w, ": ping\n\n")
if err != nil {
return fmt.Errorf("failed to send initial ping: %w", err)
}
w.Flush()
eventStream, errorStream, sseCancel := c.sseStreamer.Stream(ctx, space.ID)
defer func() {
uerr := sseCancel(ctx)
if uerr != nil {
log.Ctx(ctx).Warn().Err(uerr).Msgf("failed to cancel sse stream for space '%s'", space.Path)
}
}()
// could not get error channel
if errorStream == nil {
_, _ = io.WriteString(w, "event: error\ndata: eof\n\n")
w.Flush()
return fmt.Errorf("could not get error channel")
}
pingTimer := time.NewTimer(pingInterval)
defer pingTimer.Stop()
enc := json.NewEncoder(w)
L:
for {
// ensure timer is stopped before resetting (see documentation)
if !pingTimer.Stop() {
// in this specific case the timer's channel could be both, empty or full
select {
case <-pingTimer.C:
default:
}
}
pingTimer.Reset(pingInterval)
select {
case <-ctx.Done():
log.Ctx(ctx).Debug().Msg("events: stream cancelled")
break L
case err := <-errorStream:
log.Err(err).Msg("events: received error in the tail channel")
break L
case <-pingTimer.C:
// if time b/w messages takes longer, send a ping
_, err = io.WriteString(w, ": ping\n\n")
if err != nil {
return fmt.Errorf("failed to send ping: %w", err)
}
w.Flush()
case event := <-eventStream:
_, err = io.WriteString(w, fmt.Sprintf("event: %s\n", event.Type))
if err != nil {
return fmt.Errorf("failed to send event header: %w", err)
}
_, err = io.WriteString(w, "data: ")
if err != nil {
return fmt.Errorf("failed to send data header: %w", err)
}
err = enc.Encode(event.Data)
if err != nil {
return fmt.Errorf("failed to send data: %w", err)
}
// NOTE: enc.Encode is ending the data with a new line, only add one more
// Source: https://cs.opensource.google/go/go/+/refs/tags/go1.21.1:src/encoding/json/stream.go;l=220
_, err = io.WriteString(w, "\n")
if err != nil {
return fmt.Errorf("failed to send end of message: %w", err)
}
w.Flush()
}
}
_, err = io.WriteString(w, "event: error\ndata: eof\n\n")
if err != nil {
return fmt.Errorf("failed to send eof: %w", err)
}
w.Flush()
log.Ctx(ctx).Debug().Msg("events: stream closed")
return nil
return chEvents, chErr, sseCancel, nil
}

View File

@ -15,19 +15,18 @@
package space
import (
"context"
"net/http"
"github.com/harness/gitness/app/api/controller/space"
"github.com/harness/gitness/app/api/render"
"github.com/harness/gitness/app/api/request"
"github.com/harness/gitness/app/io"
"github.com/rs/zerolog/log"
)
// HandleEvents returns an http.HandlerFunc that watches for
// events on a space.
func HandleEvents(spaceCtrl *space.Controller) http.HandlerFunc {
// HandleEvents returns a http.HandlerFunc that watches for events on a space.
func HandleEvents(appCtx context.Context, spaceCtrl *space.Controller) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
session, _ := request.AuthSessionFrom(ctx)
@ -38,26 +37,17 @@ func HandleEvents(spaceCtrl *space.Controller) http.HandlerFunc {
return
}
h := w.Header()
h.Set("Content-Type", "text/event-stream")
h.Set("Cache-Control", "no-cache")
h.Set("Connection", "keep-alive")
h.Set("X-Accel-Buffering", "no")
h.Set("Access-Control-Allow-Origin", "*")
f, ok := w.(http.Flusher)
if !ok {
log.Error().Msg("http writer type assertion failed")
render.InternalError(w)
return
}
writer := io.NewWriterFlusher(w, f)
err = spaceCtrl.Events(ctx, session, spaceRef, writer)
chEvents, chErr, sseCancel, err := spaceCtrl.Events(ctx, session, spaceRef)
if err != nil {
render.TranslatedUserError(w, err)
return
}
defer func() {
if err := sseCancel(ctx); err != nil {
log.Ctx(ctx).Err(err).Msgf("failed to cancel sse stream for space '%s'", spaceRef)
}
}()
render.StreamSSE(ctx, w, appCtx.Done(), chEvents, chErr)
}
}

170
app/api/render/sse.go Normal file
View File

@ -0,0 +1,170 @@
// Copyright 2023 Harness, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package render
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
"github.com/harness/gitness/app/api/usererror"
"github.com/harness/gitness/app/sse"
"github.com/rs/zerolog/log"
)
func StreamSSE(
ctx context.Context,
w http.ResponseWriter,
chStop <-chan struct{},
chEvents <-chan *sse.Event,
chErr <-chan error,
) {
flusher, ok := w.(http.Flusher)
if !ok {
UserError(w, usererror.ErrResponseNotFlushable)
log.Ctx(ctx).Warn().Err(usererror.ErrResponseNotFlushable).Send()
return
}
h := w.Header()
h.Set("Content-Type", "text/event-stream")
h.Set("Cache-Control", "no-cache")
h.Set("Connection", "keep-alive")
h.Set("X-Accel-Buffering", "no")
h.Set("Access-Control-Allow-Origin", "*")
enc := json.NewEncoder(w)
enc.SetEscapeHTML(false)
stream := sseStream{
enc: enc,
writer: w,
flusher: flusher,
}
const (
pingInterval = 30 * time.Second
tailMaxTime = 2 * time.Hour
)
ctx, ctxCancel := context.WithTimeout(ctx, tailMaxTime)
defer ctxCancel()
if err := stream.ping(); err != nil {
log.Ctx(ctx).Err(err).Msg("failed to send initial ping")
return
}
defer func() {
if err := stream.close(); err != nil {
log.Ctx(ctx).Err(err).Msg("failed to close SSE stream")
}
}()
pingTimer := time.NewTimer(pingInterval)
defer pingTimer.Stop()
for {
select {
case <-ctx.Done():
log.Ctx(ctx).Debug().Err(ctx.Err()).Msg("stream SSE request context done")
return
case <-chStop:
log.Ctx(ctx).Debug().Msg("app shutdown")
return
case err := <-chErr:
log.Ctx(ctx).Debug().Err(err).Msg("received error from SSE stream")
return
case <-pingTimer.C:
if err := stream.ping(); err != nil {
log.Ctx(ctx).Err(err).Msg("failed to send SSE ping")
return
}
case event := <-chEvents:
if err := stream.event(event); err != nil {
log.Ctx(ctx).Err(err).Msgf("failed to send SSE event: %s", event.Type)
return
}
}
pingTimer.Stop() // stop timer
select {
case <-pingTimer.C: // drain channel
default:
}
pingTimer.Reset(pingInterval) // reset timer
}
}
type sseStream struct {
enc *json.Encoder
writer io.Writer
flusher http.Flusher
}
func (r sseStream) event(event *sse.Event) error {
_, err := io.WriteString(r.writer, fmt.Sprintf("event: %s\n", event.Type))
if err != nil {
return fmt.Errorf("failed to send event header: %w", err)
}
_, err = io.WriteString(r.writer, "data: ")
if err != nil {
return fmt.Errorf("failed to send data header: %w", err)
}
err = r.enc.Encode(event.Data)
if err != nil {
return fmt.Errorf("failed to send data: %w", err)
}
// NOTE: enc.Encode is ending the data with a new line, only add one more
// Source: https://cs.opensource.google/go/go/+/refs/tags/go1.21.1:src/encoding/json/stream.go;l=220
_, err = r.writer.Write([]byte{'\n'})
if err != nil {
return fmt.Errorf("failed to send end of message: %w", err)
}
r.flusher.Flush()
return nil
}
func (r sseStream) close() error {
_, err := io.WriteString(r.writer, "event: error\ndata: eof\n\n")
if err != nil {
return fmt.Errorf("failed to send EOF: %w", err)
}
r.flusher.Flush()
return nil
}
func (r sseStream) ping() error {
_, err := io.WriteString(r.writer, ": ping\n\n")
if err != nil {
return fmt.Errorf("failed to send ping: %w", err)
}
r.flusher.Flush()
return nil
}

View File

@ -79,6 +79,9 @@ var (
// ErrCodeOwnersNotFound is returned when codeowners file is not found.
ErrCodeOwnersNotFound = New(http.StatusNotFound, "CODEOWNERS file not found")
// ErrResponseNotFlushable is returned if the response writer doesn't implement http.Flusher.
ErrResponseNotFlushable = New(http.StatusInternalServerError, "Response not streamable")
)
// Error represents a json-encoded API error.

View File

@ -1,48 +0,0 @@
// Copyright 2023 Harness, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package io
import "io"
type Flusher interface {
Flush()
}
type writeWithFlusher struct {
writer io.Writer
flusher Flusher
}
// nolint
type WriterFlusher interface {
io.Writer
Flusher
}
func NewWriterFlusher(writer io.Writer, flusher Flusher) WriterFlusher {
return &writeWithFlusher{
writer: writer,
flusher: flusher,
}
}
func (w *writeWithFlusher) Write(p []byte) (int, error) {
n, err := w.writer.Write(p)
return n, err
}
func (w *writeWithFlusher) Flush() {
w.flusher.Flush()
}

View File

@ -15,6 +15,7 @@
package router
import (
"context"
"fmt"
"net/http"
@ -89,6 +90,7 @@ var (
// NewAPIHandler returns a new APIHandler.
func NewAPIHandler(
appCtx context.Context,
config *types.Config,
authenticator authn.Authenticator,
repoCtrl *repo.Controller,
@ -132,7 +134,7 @@ func NewAPIHandler(
r.Use(middlewareauthn.Attempt(authenticator))
r.Route("/v1", func(r chi.Router) {
setupRoutesV1(r, config, repoCtrl, executionCtrl, triggerCtrl, logCtrl, pipelineCtrl,
setupRoutesV1(r, appCtx, config, repoCtrl, executionCtrl, triggerCtrl, logCtrl, pipelineCtrl,
connectorCtrl, templateCtrl, pluginCtrl, secretCtrl, spaceCtrl, pullreqCtrl,
webhookCtrl, githookCtrl, saCtrl, userCtrl, principalCtrl, checkCtrl, sysCtrl, uploadCtrl)
})
@ -154,7 +156,9 @@ func corsHandler(config *types.Config) func(http.Handler) http.Handler {
).Handler
}
// nolint: revive // it's the app context, it shouldn't be the first argument
func setupRoutesV1(r chi.Router,
appCtx context.Context,
config *types.Config,
repoCtrl *repo.Controller,
executionCtrl *execution.Controller,
@ -176,7 +180,7 @@ func setupRoutesV1(r chi.Router,
sysCtrl *system.Controller,
uploadCtrl *upload.Controller,
) {
setupSpaces(r, spaceCtrl)
setupSpaces(r, appCtx, spaceCtrl)
setupRepos(r, repoCtrl, pipelineCtrl, executionCtrl, triggerCtrl, logCtrl, pullreqCtrl, webhookCtrl, checkCtrl,
uploadCtrl)
setupConnectors(r, connectorCtrl)
@ -193,7 +197,8 @@ func setupRoutesV1(r chi.Router,
setupPlugins(r, pluginCtrl)
}
func setupSpaces(r chi.Router, spaceCtrl *space.Controller) {
// nolint: revive // it's the app context, it shouldn't be the first argument
func setupSpaces(r chi.Router, appCtx context.Context, spaceCtrl *space.Controller) {
r.Route("/spaces", func(r chi.Router) {
// Create takes path and parentId via body, not uri
r.Post("/", handlerspace.HandleCreate(spaceCtrl))
@ -205,7 +210,7 @@ func setupSpaces(r chi.Router, spaceCtrl *space.Controller) {
r.Patch("/", handlerspace.HandleUpdate(spaceCtrl))
r.Delete("/", handlerspace.HandleDelete(spaceCtrl))
r.Get("/events", handlerspace.HandleEvents(spaceCtrl))
r.Get("/events", handlerspace.HandleEvents(appCtx, spaceCtrl))
r.Post("/move", handlerspace.HandleMove(spaceCtrl))
r.Get("/spaces", handlerspace.HandleListSpaces(spaceCtrl))

View File

@ -15,6 +15,7 @@
package router
import (
"context"
"strings"
"github.com/harness/gitness/app/api/controller/check"
@ -84,6 +85,7 @@ func ProvideGitHandler(
}
func ProvideAPIHandler(
appCtx context.Context,
config *types.Config,
authenticator authn.Authenticator,
repoCtrl *repo.Controller,
@ -106,7 +108,8 @@ func ProvideAPIHandler(
sysCtrl *system.Controller,
blobCtrl *upload.Controller,
) APIHandler {
return NewAPIHandler(config, authenticator, repoCtrl, executionCtrl, logCtrl, spaceCtrl, pipelineCtrl,
return NewAPIHandler(appCtx, config,
authenticator, repoCtrl, executionCtrl, logCtrl, spaceCtrl, pipelineCtrl,
secretCtrl, triggerCtrl, connectorCtrl, templateCtrl, pluginCtrl, pullreqCtrl, webhookCtrl,
githookCtrl, saCtrl, userCtrl, principalCtrl, checkCtrl, sysCtrl, blobCtrl)
}

View File

@ -34,7 +34,7 @@ type Streamer interface {
// Publish publishes an event to a given space ID.
Publish(ctx context.Context, spaceID int64, eventType enum.SSEType, data any) error
// Streams streams the events on a space ID.
// Stream streams the events on a space ID.
Stream(ctx context.Context, spaceID int64) (<-chan *Event, <-chan error, func(context.Context) error)
}

View File

@ -243,7 +243,7 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro
return nil, err
}
uploadController := upload.ProvideController(authorizer, repoStore, blobStore)
apiHandler := router.ProvideAPIHandler(config, authenticator, repoController, executionController, logsController, spaceController, pipelineController, secretController, triggerController, connectorController, templateController, pluginController, pullreqController, webhookController, githookController, serviceaccountController, controller, principalController, checkController, systemController, uploadController)
apiHandler := router.ProvideAPIHandler(ctx, config, authenticator, repoController, executionController, logsController, spaceController, pipelineController, secretController, triggerController, connectorController, templateController, pluginController, pullreqController, webhookController, githookController, serviceaccountController, controller, principalController, checkController, systemController, uploadController)
gitHandler := router.ProvideGitHandler(provider, authenticator, repoController)
webHandler := router.ProvideWebHandler(config)
routerRouter := router.ProvideRouter(apiHandler, gitHandler, webHandler, provider)

View File

@ -12,20 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// Enums for event types delivered to the event stream for the UI
package enum
// SSEType defines the kind of server sent event.
type SSEType string
// Enums for event types delivered to the event stream for the UI.
const (
SSETypeExecutionUpdated = "execution_updated"
SSETypeExecutionRunning = "execution_running"
SSETypeExecutionCompleted = "execution_completed"
SSETypeExecutionCanceled = "execution_canceled"
SSETypeExecutionUpdated SSEType = "execution_updated"
SSETypeExecutionRunning SSEType = "execution_running"
SSETypeExecutionCompleted SSEType = "execution_completed"
SSETypeExecutionCanceled SSEType = "execution_canceled"
SSETypeRepositoryImportCompleted = "repository_import_completed"
SSETypeRepositoryExportCompleted = "repository_export_completed"
SSETypeRepositoryImportCompleted SSEType = "repository_import_completed"
SSETypeRepositoryExportCompleted SSEType = "repository_export_completed"
SSETypePullrequesUpdated = "pullreq_updated"
SSETypePullrequesUpdated SSEType = "pullreq_updated"
)