From b6ee8b6ba9c6669e75f236aa209d08a732a9fea0 Mon Sep 17 00:00:00 2001 From: Marko Gacesa Date: Wed, 8 Nov 2023 07:24:14 +0000 Subject: [PATCH] fix sse space events API blocking app shutdown (#774) --- app/api/controller/space/events.go | 105 +----------------- app/api/handler/space/events.go | 32 ++---- app/api/render/sse.go | 170 +++++++++++++++++++++++++++++ app/api/usererror/usererror.go | 3 + app/io/writeflush.go | 48 -------- app/router/api.go | 13 ++- app/router/wire.go | 5 +- app/sse/sse.go | 2 +- cmd/gitness/wire_gen.go | 2 +- types/enum/sse.go | 16 +-- 10 files changed, 213 insertions(+), 183 deletions(-) create mode 100644 app/api/render/sse.go delete mode 100644 app/io/writeflush.go diff --git a/app/api/controller/space/events.go b/app/api/controller/space/events.go index 4b447fa41..3f4083f85 100644 --- a/app/api/controller/space/events.go +++ b/app/api/controller/space/events.go @@ -16,122 +16,29 @@ package space import ( "context" - "encoding/json" "fmt" - "io" - "time" apiauth "github.com/harness/gitness/app/api/auth" "github.com/harness/gitness/app/auth" - gitnessio "github.com/harness/gitness/app/io" + "github.com/harness/gitness/app/sse" "github.com/harness/gitness/types/enum" - - "github.com/rs/zerolog/log" ) -var ( - pingInterval = 30 * time.Second - tailMaxTime = 2 * time.Hour -) - -//nolint:gocognit // refactor if needed func (c *Controller) Events( ctx context.Context, session *auth.Session, spaceRef string, - w gitnessio.WriterFlusher, -) error { +) (<-chan *sse.Event, <-chan error, func(context.Context) error, error) { space, err := c.spaceStore.FindByRef(ctx, spaceRef) if err != nil { - return fmt.Errorf("failed to find space ref: %w", err) + return nil, nil, nil, fmt.Errorf("failed to find space ref: %w", err) } if err = apiauth.CheckSpace(ctx, c.authorizer, session, space, enum.PermissionSpaceView, true); err != nil { - return fmt.Errorf("failed to authorize stream: %w", err) + return nil, nil, nil, fmt.Errorf("failed to authorize stream: %w", err) } - ctx, ctxCancel := context.WithTimeout(ctx, tailMaxTime) - defer ctxCancel() + chEvents, chErr, sseCancel := c.sseStreamer.Stream(ctx, space.ID) - _, err = io.WriteString(w, ": ping\n\n") - if err != nil { - return fmt.Errorf("failed to send initial ping: %w", err) - } - w.Flush() - - eventStream, errorStream, sseCancel := c.sseStreamer.Stream(ctx, space.ID) - defer func() { - uerr := sseCancel(ctx) - if uerr != nil { - log.Ctx(ctx).Warn().Err(uerr).Msgf("failed to cancel sse stream for space '%s'", space.Path) - } - }() - // could not get error channel - if errorStream == nil { - _, _ = io.WriteString(w, "event: error\ndata: eof\n\n") - w.Flush() - return fmt.Errorf("could not get error channel") - } - pingTimer := time.NewTimer(pingInterval) - defer pingTimer.Stop() - - enc := json.NewEncoder(w) -L: - for { - // ensure timer is stopped before resetting (see documentation) - if !pingTimer.Stop() { - // in this specific case the timer's channel could be both, empty or full - select { - case <-pingTimer.C: - default: - } - } - pingTimer.Reset(pingInterval) - select { - case <-ctx.Done(): - log.Ctx(ctx).Debug().Msg("events: stream cancelled") - break L - case err := <-errorStream: - log.Err(err).Msg("events: received error in the tail channel") - break L - case <-pingTimer.C: - // if time b/w messages takes longer, send a ping - _, err = io.WriteString(w, ": ping\n\n") - if err != nil { - return fmt.Errorf("failed to send ping: %w", err) - } - w.Flush() - case event := <-eventStream: - _, err = io.WriteString(w, fmt.Sprintf("event: %s\n", event.Type)) - if err != nil { - return fmt.Errorf("failed to send event header: %w", err) - } - _, err = io.WriteString(w, "data: ") - if err != nil { - return fmt.Errorf("failed to send data header: %w", err) - } - err = enc.Encode(event.Data) - if err != nil { - return fmt.Errorf("failed to send data: %w", err) - } - // NOTE: enc.Encode is ending the data with a new line, only add one more - // Source: https://cs.opensource.google/go/go/+/refs/tags/go1.21.1:src/encoding/json/stream.go;l=220 - _, err = io.WriteString(w, "\n") - if err != nil { - return fmt.Errorf("failed to send end of message: %w", err) - } - - w.Flush() - } - } - - _, err = io.WriteString(w, "event: error\ndata: eof\n\n") - if err != nil { - return fmt.Errorf("failed to send eof: %w", err) - } - w.Flush() - - log.Ctx(ctx).Debug().Msg("events: stream closed") - - return nil + return chEvents, chErr, sseCancel, nil } diff --git a/app/api/handler/space/events.go b/app/api/handler/space/events.go index 5076d257e..b17dd7fe6 100644 --- a/app/api/handler/space/events.go +++ b/app/api/handler/space/events.go @@ -15,19 +15,18 @@ package space import ( + "context" "net/http" "github.com/harness/gitness/app/api/controller/space" "github.com/harness/gitness/app/api/render" "github.com/harness/gitness/app/api/request" - "github.com/harness/gitness/app/io" "github.com/rs/zerolog/log" ) -// HandleEvents returns an http.HandlerFunc that watches for -// events on a space. -func HandleEvents(spaceCtrl *space.Controller) http.HandlerFunc { +// HandleEvents returns a http.HandlerFunc that watches for events on a space. +func HandleEvents(appCtx context.Context, spaceCtrl *space.Controller) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() session, _ := request.AuthSessionFrom(ctx) @@ -38,26 +37,17 @@ func HandleEvents(spaceCtrl *space.Controller) http.HandlerFunc { return } - h := w.Header() - h.Set("Content-Type", "text/event-stream") - h.Set("Cache-Control", "no-cache") - h.Set("Connection", "keep-alive") - h.Set("X-Accel-Buffering", "no") - h.Set("Access-Control-Allow-Origin", "*") - - f, ok := w.(http.Flusher) - if !ok { - log.Error().Msg("http writer type assertion failed") - render.InternalError(w) - return - } - - writer := io.NewWriterFlusher(w, f) - - err = spaceCtrl.Events(ctx, session, spaceRef, writer) + chEvents, chErr, sseCancel, err := spaceCtrl.Events(ctx, session, spaceRef) if err != nil { render.TranslatedUserError(w, err) return } + defer func() { + if err := sseCancel(ctx); err != nil { + log.Ctx(ctx).Err(err).Msgf("failed to cancel sse stream for space '%s'", spaceRef) + } + }() + + render.StreamSSE(ctx, w, appCtx.Done(), chEvents, chErr) } } diff --git a/app/api/render/sse.go b/app/api/render/sse.go new file mode 100644 index 000000000..8ac33f245 --- /dev/null +++ b/app/api/render/sse.go @@ -0,0 +1,170 @@ +// Copyright 2023 Harness, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package render + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + "github.com/harness/gitness/app/api/usererror" + "github.com/harness/gitness/app/sse" + + "github.com/rs/zerolog/log" +) + +func StreamSSE( + ctx context.Context, + w http.ResponseWriter, + chStop <-chan struct{}, + chEvents <-chan *sse.Event, + chErr <-chan error, +) { + flusher, ok := w.(http.Flusher) + if !ok { + UserError(w, usererror.ErrResponseNotFlushable) + log.Ctx(ctx).Warn().Err(usererror.ErrResponseNotFlushable).Send() + return + } + + h := w.Header() + h.Set("Content-Type", "text/event-stream") + h.Set("Cache-Control", "no-cache") + h.Set("Connection", "keep-alive") + h.Set("X-Accel-Buffering", "no") + h.Set("Access-Control-Allow-Origin", "*") + + enc := json.NewEncoder(w) + enc.SetEscapeHTML(false) + + stream := sseStream{ + enc: enc, + writer: w, + flusher: flusher, + } + + const ( + pingInterval = 30 * time.Second + tailMaxTime = 2 * time.Hour + ) + + ctx, ctxCancel := context.WithTimeout(ctx, tailMaxTime) + defer ctxCancel() + + if err := stream.ping(); err != nil { + log.Ctx(ctx).Err(err).Msg("failed to send initial ping") + return + } + + defer func() { + if err := stream.close(); err != nil { + log.Ctx(ctx).Err(err).Msg("failed to close SSE stream") + } + }() + + pingTimer := time.NewTimer(pingInterval) + defer pingTimer.Stop() + + for { + select { + case <-ctx.Done(): + log.Ctx(ctx).Debug().Err(ctx.Err()).Msg("stream SSE request context done") + return + + case <-chStop: + log.Ctx(ctx).Debug().Msg("app shutdown") + return + + case err := <-chErr: + log.Ctx(ctx).Debug().Err(err).Msg("received error from SSE stream") + return + + case <-pingTimer.C: + if err := stream.ping(); err != nil { + log.Ctx(ctx).Err(err).Msg("failed to send SSE ping") + return + } + + case event := <-chEvents: + if err := stream.event(event); err != nil { + log.Ctx(ctx).Err(err).Msgf("failed to send SSE event: %s", event.Type) + return + } + } + + pingTimer.Stop() // stop timer + + select { + case <-pingTimer.C: // drain channel + default: + } + + pingTimer.Reset(pingInterval) // reset timer + } +} + +type sseStream struct { + enc *json.Encoder + writer io.Writer + flusher http.Flusher +} + +func (r sseStream) event(event *sse.Event) error { + _, err := io.WriteString(r.writer, fmt.Sprintf("event: %s\n", event.Type)) + if err != nil { + return fmt.Errorf("failed to send event header: %w", err) + } + + _, err = io.WriteString(r.writer, "data: ") + if err != nil { + return fmt.Errorf("failed to send data header: %w", err) + } + + err = r.enc.Encode(event.Data) + if err != nil { + return fmt.Errorf("failed to send data: %w", err) + } + + // NOTE: enc.Encode is ending the data with a new line, only add one more + // Source: https://cs.opensource.google/go/go/+/refs/tags/go1.21.1:src/encoding/json/stream.go;l=220 + _, err = r.writer.Write([]byte{'\n'}) + if err != nil { + return fmt.Errorf("failed to send end of message: %w", err) + } + + r.flusher.Flush() + return nil +} + +func (r sseStream) close() error { + _, err := io.WriteString(r.writer, "event: error\ndata: eof\n\n") + if err != nil { + return fmt.Errorf("failed to send EOF: %w", err) + } + r.flusher.Flush() + return nil +} + +func (r sseStream) ping() error { + _, err := io.WriteString(r.writer, ": ping\n\n") + if err != nil { + return fmt.Errorf("failed to send ping: %w", err) + } + r.flusher.Flush() + return nil +} diff --git a/app/api/usererror/usererror.go b/app/api/usererror/usererror.go index 49d2ac7bf..b07f653e6 100644 --- a/app/api/usererror/usererror.go +++ b/app/api/usererror/usererror.go @@ -79,6 +79,9 @@ var ( // ErrCodeOwnersNotFound is returned when codeowners file is not found. ErrCodeOwnersNotFound = New(http.StatusNotFound, "CODEOWNERS file not found") + + // ErrResponseNotFlushable is returned if the response writer doesn't implement http.Flusher. + ErrResponseNotFlushable = New(http.StatusInternalServerError, "Response not streamable") ) // Error represents a json-encoded API error. diff --git a/app/io/writeflush.go b/app/io/writeflush.go deleted file mode 100644 index 7ee76c590..000000000 --- a/app/io/writeflush.go +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright 2023 Harness, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package io - -import "io" - -type Flusher interface { - Flush() -} - -type writeWithFlusher struct { - writer io.Writer - flusher Flusher -} - -// nolint -type WriterFlusher interface { - io.Writer - Flusher -} - -func NewWriterFlusher(writer io.Writer, flusher Flusher) WriterFlusher { - return &writeWithFlusher{ - writer: writer, - flusher: flusher, - } -} - -func (w *writeWithFlusher) Write(p []byte) (int, error) { - n, err := w.writer.Write(p) - return n, err -} - -func (w *writeWithFlusher) Flush() { - w.flusher.Flush() -} diff --git a/app/router/api.go b/app/router/api.go index dcb7edcd0..3ad8c7e05 100644 --- a/app/router/api.go +++ b/app/router/api.go @@ -15,6 +15,7 @@ package router import ( + "context" "fmt" "net/http" @@ -89,6 +90,7 @@ var ( // NewAPIHandler returns a new APIHandler. func NewAPIHandler( + appCtx context.Context, config *types.Config, authenticator authn.Authenticator, repoCtrl *repo.Controller, @@ -132,7 +134,7 @@ func NewAPIHandler( r.Use(middlewareauthn.Attempt(authenticator)) r.Route("/v1", func(r chi.Router) { - setupRoutesV1(r, config, repoCtrl, executionCtrl, triggerCtrl, logCtrl, pipelineCtrl, + setupRoutesV1(r, appCtx, config, repoCtrl, executionCtrl, triggerCtrl, logCtrl, pipelineCtrl, connectorCtrl, templateCtrl, pluginCtrl, secretCtrl, spaceCtrl, pullreqCtrl, webhookCtrl, githookCtrl, saCtrl, userCtrl, principalCtrl, checkCtrl, sysCtrl, uploadCtrl) }) @@ -154,7 +156,9 @@ func corsHandler(config *types.Config) func(http.Handler) http.Handler { ).Handler } +// nolint: revive // it's the app context, it shouldn't be the first argument func setupRoutesV1(r chi.Router, + appCtx context.Context, config *types.Config, repoCtrl *repo.Controller, executionCtrl *execution.Controller, @@ -176,7 +180,7 @@ func setupRoutesV1(r chi.Router, sysCtrl *system.Controller, uploadCtrl *upload.Controller, ) { - setupSpaces(r, spaceCtrl) + setupSpaces(r, appCtx, spaceCtrl) setupRepos(r, repoCtrl, pipelineCtrl, executionCtrl, triggerCtrl, logCtrl, pullreqCtrl, webhookCtrl, checkCtrl, uploadCtrl) setupConnectors(r, connectorCtrl) @@ -193,7 +197,8 @@ func setupRoutesV1(r chi.Router, setupPlugins(r, pluginCtrl) } -func setupSpaces(r chi.Router, spaceCtrl *space.Controller) { +// nolint: revive // it's the app context, it shouldn't be the first argument +func setupSpaces(r chi.Router, appCtx context.Context, spaceCtrl *space.Controller) { r.Route("/spaces", func(r chi.Router) { // Create takes path and parentId via body, not uri r.Post("/", handlerspace.HandleCreate(spaceCtrl)) @@ -205,7 +210,7 @@ func setupSpaces(r chi.Router, spaceCtrl *space.Controller) { r.Patch("/", handlerspace.HandleUpdate(spaceCtrl)) r.Delete("/", handlerspace.HandleDelete(spaceCtrl)) - r.Get("/events", handlerspace.HandleEvents(spaceCtrl)) + r.Get("/events", handlerspace.HandleEvents(appCtx, spaceCtrl)) r.Post("/move", handlerspace.HandleMove(spaceCtrl)) r.Get("/spaces", handlerspace.HandleListSpaces(spaceCtrl)) diff --git a/app/router/wire.go b/app/router/wire.go index 80b076a0a..b262e7612 100644 --- a/app/router/wire.go +++ b/app/router/wire.go @@ -15,6 +15,7 @@ package router import ( + "context" "strings" "github.com/harness/gitness/app/api/controller/check" @@ -84,6 +85,7 @@ func ProvideGitHandler( } func ProvideAPIHandler( + appCtx context.Context, config *types.Config, authenticator authn.Authenticator, repoCtrl *repo.Controller, @@ -106,7 +108,8 @@ func ProvideAPIHandler( sysCtrl *system.Controller, blobCtrl *upload.Controller, ) APIHandler { - return NewAPIHandler(config, authenticator, repoCtrl, executionCtrl, logCtrl, spaceCtrl, pipelineCtrl, + return NewAPIHandler(appCtx, config, + authenticator, repoCtrl, executionCtrl, logCtrl, spaceCtrl, pipelineCtrl, secretCtrl, triggerCtrl, connectorCtrl, templateCtrl, pluginCtrl, pullreqCtrl, webhookCtrl, githookCtrl, saCtrl, userCtrl, principalCtrl, checkCtrl, sysCtrl, blobCtrl) } diff --git a/app/sse/sse.go b/app/sse/sse.go index 78e3b1a17..2e94390af 100644 --- a/app/sse/sse.go +++ b/app/sse/sse.go @@ -34,7 +34,7 @@ type Streamer interface { // Publish publishes an event to a given space ID. Publish(ctx context.Context, spaceID int64, eventType enum.SSEType, data any) error - // Streams streams the events on a space ID. + // Stream streams the events on a space ID. Stream(ctx context.Context, spaceID int64) (<-chan *Event, <-chan error, func(context.Context) error) } diff --git a/cmd/gitness/wire_gen.go b/cmd/gitness/wire_gen.go index aca1d8e62..c061ce506 100644 --- a/cmd/gitness/wire_gen.go +++ b/cmd/gitness/wire_gen.go @@ -243,7 +243,7 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro return nil, err } uploadController := upload.ProvideController(authorizer, repoStore, blobStore) - apiHandler := router.ProvideAPIHandler(config, authenticator, repoController, executionController, logsController, spaceController, pipelineController, secretController, triggerController, connectorController, templateController, pluginController, pullreqController, webhookController, githookController, serviceaccountController, controller, principalController, checkController, systemController, uploadController) + apiHandler := router.ProvideAPIHandler(ctx, config, authenticator, repoController, executionController, logsController, spaceController, pipelineController, secretController, triggerController, connectorController, templateController, pluginController, pullreqController, webhookController, githookController, serviceaccountController, controller, principalController, checkController, systemController, uploadController) gitHandler := router.ProvideGitHandler(provider, authenticator, repoController) webHandler := router.ProvideWebHandler(config) routerRouter := router.ProvideRouter(apiHandler, gitHandler, webHandler, provider) diff --git a/types/enum/sse.go b/types/enum/sse.go index 049cacd0a..081396809 100644 --- a/types/enum/sse.go +++ b/types/enum/sse.go @@ -12,20 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Enums for event types delivered to the event stream for the UI package enum // SSEType defines the kind of server sent event. type SSEType string +// Enums for event types delivered to the event stream for the UI. const ( - SSETypeExecutionUpdated = "execution_updated" - SSETypeExecutionRunning = "execution_running" - SSETypeExecutionCompleted = "execution_completed" - SSETypeExecutionCanceled = "execution_canceled" + SSETypeExecutionUpdated SSEType = "execution_updated" + SSETypeExecutionRunning SSEType = "execution_running" + SSETypeExecutionCompleted SSEType = "execution_completed" + SSETypeExecutionCanceled SSEType = "execution_canceled" - SSETypeRepositoryImportCompleted = "repository_import_completed" - SSETypeRepositoryExportCompleted = "repository_export_completed" + SSETypeRepositoryImportCompleted SSEType = "repository_import_completed" + SSETypeRepositoryExportCompleted SSEType = "repository_export_completed" - SSETypePullrequesUpdated = "pullreq_updated" + SSETypePullrequesUpdated SSEType = "pullreq_updated" )